Skip to content

Commit

Permalink
Merge pull request #47 from totem/develop
Browse files Browse the repository at this point in the history
0.4.3 Release
  • Loading branch information
sukrit007 committed Feb 10, 2016
2 parents fc50725 + 236d460 commit 7129c36
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 22 deletions.
13 changes: 8 additions & 5 deletions conf/appconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@
'DEFAULT_CHORD_RETRIES': 30,
'SSH_RETRY_DELAY': 10,
'SSH_RETRIES': 10,
'CHECK_RUNNING_RETRIES': 60,
'CHECK_RUNNING_RETRY_DELAY': 60,
'CHECK_RUNNING_RETRIES': 30,
'CHECK_RUNNING_RETRY_DELAY': 30,
'CHECK_DISCOVERY_RETRIES': 20,
'CHECK_DISCOVERY_RETRY_DELAY': 30,
'LOCK_RETRIES': 64,
'LOCK_RETRIES': 120,
'LOCK_RETRY_DELAY': 60,
'DEPLOYMENT_WAIT_RETRIES': 60,
'DEPLOYMENT_WAIT_RETRIES': 240,
'DEPLOYMENT_WAIT_RETRY_DELAY': 60,
'CHECK_NODE_RETRY_DELAY': 10,
'DEPLOYMENT_STOP_MIN_CHECK_RETRY_DELAY': 2,
'DEFAULT_DEPLOYMENT_STOP_CHECK_RETRIES': 10,
'DEPLOYMENT_CREATE_LIMIT': os.getenv('DEPLOYMENT_CREATE_LIMIT', '1/m'),
'START_CONCURRENCY': os.getenv('START_CONCURRENCY', 4),
'START_CONCURRENCY_RETRIES': 60,
'START_CONCURRENCY_RETRY_DELAY': 60,

}

DEFAULT_CHORD_OPTIONS = {
Expand Down
2 changes: 1 addition & 1 deletion deployer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from celery.signals import setup_logging


__version__ = '0.4.2'
__version__ = '0.4.3'
__author__ = 'sukrit'

deployer.logger.init_logging()
Expand Down
6 changes: 3 additions & 3 deletions deployer/services/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from fleet.deploy.deployer import filter_units
from conf.appconfig import CLUSTER_NAME, DEPLOYMENT_TYPE_GIT_QUAY, \
DEPLOYMENT_DEFAULTS, TEMPLATE_DEFAULTS, \
DEPLOYMENT_STATE_STARTED, UPSTREAM_DEFAULTS, DEPLOYMENT_TYPE_DEFAULT, \
DISCOVER_UPSTREAM_TTL_DEFAULT
UPSTREAM_DEFAULTS, DEPLOYMENT_TYPE_DEFAULT, \
DISCOVER_UPSTREAM_TTL_DEFAULT, DEPLOYMENT_STATE_NEW
from deployer.fleet import get_fleet_provider
from deployer.services.proxy import get_discovered_nodes
from deployer.services.storage.factory import get_store
Expand Down Expand Up @@ -245,7 +245,7 @@ def apply_defaults(deployment):
app_name = deployment_upd['deployment']['name']
app_version = deployment_upd['deployment']['version']
deployment_upd['id'] = generate_deployment_id(app_name, app_version)
deployment_upd['state'] = DEPLOYMENT_STATE_STARTED
deployment_upd['state'] = DEPLOYMENT_STATE_NEW
deployment_upd['started-at'] = datetime.datetime.utcnow()

app_template = deployment_upd.get('templates').get('app')
Expand Down
25 changes: 21 additions & 4 deletions deployer/tasks/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
sync_upstreams, sync_units, apply_defaults, clone_deployment
from deployer.tasks import notification
from deployer.tasks.exceptions import NodeNotUndeployed, MinNodesNotRunning, \
NodeCheckFailed, MinNodesNotDiscovered, NodeNotStopped
NodeCheckFailed, MinNodesNotDiscovered, NodeNotStopped, \
MaxStartConcurrencyReached
from deployer.tasks import util

from deployer.services.storage.base import EVENT_NEW_DEPLOYMENT, \
Expand All @@ -42,7 +43,7 @@
DEPLOYMENT_STATE_FAILED, DEPLOYMENT_STATE_PROMOTED, \
LEVEL_STARTED, LEVEL_FAILED, LEVEL_SUCCESS, CLUSTER_NAME, \
DEPLOYMENT_STATE_DECOMMISSIONED, LOCK_JOB_BASE, DEPLOYMENT_TYPE_DEFAULT, \
DEFAULT_CHORD_OPTIONS
DEFAULT_CHORD_OPTIONS, DEPLOYMENT_STATE_STARTED

from deployer.tasks.common import async_wait
from deployer.services.proxy import wire_proxy, register_upstreams, \
Expand Down Expand Up @@ -135,7 +136,7 @@ def _check_discover(self, app_name, app_version, check_port, min_nodes,
return discovered_nodes


@app.task(rate_limit=TASK_SETTINGS['DEPLOYMENT_CREATE_LIMIT'])
@app.task
def create(deployment):
"""
Task for creating deployment.
Expand Down Expand Up @@ -190,7 +191,8 @@ def create(deployment):
_using_lock.si(
search_params,
app_name,
do_task=_pre_create_undeploy.si(
do_task=_start_deployment.si(deployment['id'], TASK_SETTINGS) |
_pre_create_undeploy.si(
deployment,
search_params,
next_task=_register_upstreams.si(
Expand Down Expand Up @@ -881,3 +883,18 @@ def recover_cluster(self, recovery_params):
max_retries=TASK_SETTINGS['DEPLOYMENT_WAIT_RETRIES']),
options=DEFAULT_CHORD_OPTIONS
).delay()


@app.task(bind=True)
def _start_deployment(self, deployment_id, task_settings):
store = get_store()
concurrency = task_settings.get('START_CONCURRENCY')
if concurrency and concurrency > 0:
used_concurrency = len(store.filter_deployments(
only_ids=True, state=DEPLOYMENT_STATE_STARTED))
if used_concurrency >= concurrency:
raise self.retry(
exc=MaxStartConcurrencyReached(concurrency, used_concurrency),
max_retries=task_settings['START_CONCURRENCY_RETRIES'],
countdown=task_settings['START_CONCURRENCY_RETRY_DELAY'])
store.update_state(deployment_id, DEPLOYMENT_STATE_STARTED)
33 changes: 33 additions & 0 deletions deployer/tasks/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,36 @@ def __eq__(self, other):
self.response == other.response and \
self.attempts == other.attempts and \
self.url == other.url


class MaxStartConcurrencyReached(Exception):
"""
Exception corresponding to maximum concurrency reached for
starting deployments
"""

def __init__(self, allowed, current):
self.allowed = allowed
self.current = current
self.message = 'Maximum concurrency reached for starting deployment.' \
'Allowed: {} Current: {}'.format(allowed, current)

super(MaxStartConcurrencyReached, self).__init__(allowed, current)

def to_dict(self):
return {
'message': self.message,
'code': 'MAX_START_CONCURRENCY_REACHED',
'details': {
'allowed': self.allowed,
'current': self.current,
}
}

def __str__(self):
return self.message

def __eq__(self, other):
return self.allowed == other.allowed and \
self.message == other.message and \
self.current == other.current
12 changes: 6 additions & 6 deletions tests/unit/services/test_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from mock import patch
from nose.tools import eq_, raises
from conf.appconfig import DEPLOYMENT_MODE_BLUEGREEN, DEFAULT_STOP_TIMEOUT, \
TASK_SETTINGS, DEPLOYMENT_STATE_STARTED, NOTIFICATIONS_DEFAULTS, \
CLUSTER_NAME, DISCOVER_UPSTREAM_TTL_DEFAULT
TASK_SETTINGS, NOTIFICATIONS_DEFAULTS, \
CLUSTER_NAME, DISCOVER_UPSTREAM_TTL_DEFAULT, DEPLOYMENT_STATE_NEW
from deployer.services.deployment import get_exposed_ports, \
fetch_runtime_upstreams, apply_defaults, sync_upstreams, sync_units, \
clone_deployment
Expand Down Expand Up @@ -217,7 +217,7 @@ def test_deployment_defaults_for_type_git_quay(mock_time):
'listeners': {},
'upstreams': {}
},
'state': DEPLOYMENT_STATE_STARTED,
'state': DEPLOYMENT_STATE_NEW,
'started-at': NOW,
'security': {
'profile': 'default'
Expand Down Expand Up @@ -373,7 +373,7 @@ def test_deployment_defaults_with_proxy(mock_time):
},
'listeners': {}
},
'state': DEPLOYMENT_STATE_STARTED,
'state': DEPLOYMENT_STATE_NEW,
'started-at': NOW,
'security': {
'profile': 'default'
Expand Down Expand Up @@ -486,7 +486,7 @@ def test_deployment_defaults_for_type_git_quay_with_overrides(mock_time):
'listeners': {},
'upstreams': {}
},
'state': DEPLOYMENT_STATE_STARTED,
'state': DEPLOYMENT_STATE_NEW,
'started-at': NOW,
'security': {
'profile': 'default'
Expand Down Expand Up @@ -595,7 +595,7 @@ def test_deployment_defaults_for_custom_deployment(mock_time):
'listeners': {},
'upstreams': {}
},
'state': DEPLOYMENT_STATE_STARTED,
'state': DEPLOYMENT_STATE_NEW,
'started-at': NOW,
'security': {
'profile': 'default'
Expand Down
80 changes: 77 additions & 3 deletions tests/unit/tasks/test_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,29 @@
from nose.tools import raises, eq_, assert_raises
from paramiko import SSHException

from conf.appconfig import DEPLOYMENT_MODE_BLUEGREEN, DEPLOYMENT_MODE_REDGREEN
from conf.appconfig import DEPLOYMENT_MODE_BLUEGREEN, \
DEPLOYMENT_MODE_REDGREEN, DEPLOYMENT_STATE_STARTED
from deployer.celery import app
from deployer.tasks.exceptions import NodeNotUndeployed, MinNodesNotRunning, \
NodeCheckFailed, MinNodesNotDiscovered
NodeCheckFailed, MinNodesNotDiscovered, MaxStartConcurrencyReached
from deployer.util import dict_merge
from tests.helper import dict_compare

from deployer.tasks.deployment import _pre_create_undeploy, \
_wait_for_undeploy, _fleet_check_deploy, _check_node, _check_deployment, \
_check_discover
_check_discover, _start_deployment

__author__ = 'sukrit'


NOW = datetime.datetime(2014, 01, 01)
DEFAULT_STOP_TIMEOUT_SECONDS = 30
MOCK_DEPLOYMENT_ID = 'mock-deployment-id'
MOCK_TASK_SETTINGS = {
'START_CONCURRENCY': 4,
'START_CONCURRENCY_RETRIES': 60,
'START_CONCURRENCY_RETRY_DELAY': 60,
}


def test_create():
Expand Down Expand Up @@ -410,3 +418,69 @@ def test_check_discover_for_min_node_criteria_not_met(mock_yoda_cl):
eq_(error.version, 'mockversion')
eq_(error.min_nodes, 2)
dict_compare(error.discovered_nodes, {'node1': 'mockhost1:48080'})


@patch('deployer.tasks.deployment.get_store')
def test_start_deployment(m_get_store):
# Given: Current task instance with mock retry
_start_deployment.retry = MagicMock()
_start_deployment.retry.side_effect = Exception('Mock')

# And: List of currently executing deployments
m_store = m_get_store.return_value
m_store.filter_deployments.return_value = [{'_id': 'id1'}]

# When: I start deployment for given task id and task settings
_start_deployment(MOCK_DEPLOYMENT_ID, MOCK_TASK_SETTINGS)

# Then: Deployment state gets updated as expected
m_store.update_state.assert_called_once_with(MOCK_DEPLOYMENT_ID,
DEPLOYMENT_STATE_STARTED)


@patch('deployer.tasks.deployment.get_store')
@raises(MaxStartConcurrencyReached)
def test_start_deployment_when_concurrency_has_exceeded(m_get_store):
# Given: Current task instance with mock retry
_start_deployment.retry = MagicMock()

def side_effect(**kwargs):
raise kwargs.get('exc')

_start_deployment.retry.side_effect = side_effect

# And: List of currently executing deployments
m_store = m_get_store.return_value
m_store.filter_deployments.return_value = [
{'_id': cnt} for cnt in range(1, 5)]

# When: I start deployment for given task id and task settings
_start_deployment(MOCK_DEPLOYMENT_ID, MOCK_TASK_SETTINGS)

# Then: Deployment state gets updated as expected
eq_(m_store.update_state.call_count, 0,
'update_state should not be called')


@patch('deployer.tasks.deployment.get_store')
def test_start_deployment_with_unlimited_concurrency(m_get_store):
# Given: Current task instance with mock retry
_start_deployment.retry = MagicMock()

def side_effect(**kwargs):
raise kwargs.get('exc')

_start_deployment.retry.side_effect = side_effect

# And: List of currently executing deployments
m_store = m_get_store.return_value
m_store.filter_deployments.return_value = [
{'_id': cnt} for cnt in range(1, 5)]

# When: I start deployment for given task id and task settings
_start_deployment(MOCK_DEPLOYMENT_ID,
dict_merge({'START_CONCURRENCY': 0}, MOCK_TASK_SETTINGS))

# Then: Deployment state gets updated as expected
m_store.update_state.assert_called_once_with(MOCK_DEPLOYMENT_ID,
DEPLOYMENT_STATE_STARTED)

0 comments on commit 7129c36

Please sign in to comment.