Skip to content

Commit

Permalink
Job state transition
Browse files Browse the repository at this point in the history
  • Loading branch information
sukrit007 committed Nov 14, 2014
1 parent 8b07fc7 commit 37a323a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 36 deletions.
28 changes: 20 additions & 8 deletions conf/appconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,22 @@
},
'default': {
'config': {
'ci': {
'type': 'travis',
'enabled': False,
},
'builder': {
'type': 'image-factory',
'enabled': True
'hooks': {
'ci': {
'travis': {
'enabled': False,
}
},
'builders': {
'image-factory': {
'enabled': True,
}
},
'scm': {
'github': {
'enabled': True
}
}
},
'deployers': {
'default': {
Expand All @@ -102,7 +111,9 @@
'DEFAULT_RETRIES': 5,
'DEFAULT_RETRY_DELAY': 10,
'LOCK_RETRY_DELAY': 5,
'LOCK_RETRIES': 20
'LOCK_RETRIES': 20,
'JOB_WAIT_RETRIES': 10,
'JOB_WAIT_RETRY_DELAY': 10,
}

JOB_SETTINGS = {
Expand All @@ -112,3 +123,4 @@
JOB_STATE_NEW = 'NEW'
JOB_STATE_SCHEDULED = 'SCHEDULED'
JOB_STATE_DEPLOY_REQUESTED = 'DEPLOY_REQUESTED'
JOB_STATE_NOOP = 'NOOP'
113 changes: 85 additions & 28 deletions orchestrator/tasks/job.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import copy
import uuid
from celery import chain
from conf.appconfig import TASK_SETTINGS, JOB_SETTINGS, JOB_STATE_NEW, \
JOB_STATE_SCHEDULED, JOB_STATE_DEPLOY_REQUESTED
JOB_STATE_SCHEDULED, JOB_STATE_DEPLOY_REQUESTED, JOB_STATE_NOOP
from conf.celeryconfig import CLUSTER_NAME
from orchestrator.celery import app
from orchestrator.etcd import using_etcd
from orchestrator.services import config
from orchestrator.services.distributed_lock import LockService, \
ResourceLockedException
from orchestrator.tasks.search import update_job_state
from orchestrator.tasks.common import async_wait

__author__ = 'sukrit'

SUPPORTED_HOOK_TYPES = ('ci', 'builder')


@app.task
def start_job(owner, repo, ref, commit=None):
def start_job(owner, repo, ref, scm_type='github', commit=None):
"""
Start the Continuous Deployment job
Expand All @@ -25,7 +27,12 @@ def start_job(owner, repo, ref, commit=None):
name='%s-%s-%s-%s' % (CLUSTER_NAME, owner, repo, ref),
do_task=(
update_job_ttl.si(owner, repo, ref) |
create_job.si(job_config, owner, repo, ref)
create_job.si(job_config, owner, repo, ref, scm_type) |
async_wait.s(
default_retry_delay=TASK_SETTINGS[
'JOB_WAIT_RETRY_DELAY'],
max_retries=TASK_SETTINGS['JOB_WAIT_RETRIES']
)
)
).apply_async()

Expand Down Expand Up @@ -112,41 +119,91 @@ def update_ttl(prev_exist=True):
update_ttl(False)


@app.task
@app.task(bind=True)
@using_etcd
def create_job(job_config, owner, repo, ref, commit=None, etcd_cl=None,
etcd_base=None):
def create_job(self, job_config, owner, repo, ref, scm_type, commit=None,
etcd_cl=None, etcd_base=None):
job_base = _job_base_location(owner, repo, ref, etcd_base)

# If we can not get branch, in that case we can not guarantee the
# co-relation of ci jobs with builder, but hooks can still be received.
commit = commit or ''

try:
state = etcd_cl.read(job_base+'/state').value
job_id = etcd_cl.read(job_base+'/job-id').value
except KeyError:
state = JOB_STATE_NEW
job_id = uuid.uuid4()

job = _as_job(job_config, job_id, owner, repo, ref, scm_type, commit)

if state in (JOB_STATE_NEW, JOB_STATE_SCHEDULED):

scm_hooks = [name for name, hook in job_config['hooks']['scm'].items()
if hook['enabled']]

builder_hooks = [name for name, hook in job_config['hooks']['builders']
.items() if hook['enabled']]

if state == JOB_STATE_NEW:
for hook_type in SUPPORTED_HOOK_TYPES:
if hook_type in job_config and job_config[hook_type]['enabled']:
etcd_cl.write('%s/%s/expect' % (job_base, hook_type), 1)
etcd_cl.write('%s/state' % job_base, 'SCHEDULED')
if not scm_hooks or not builder_hooks or \
(state in (JOB_STATE_NEW, JOB_STATE_SCHEDULED) and
scm_type not in scm_hooks):
return _handle_noop.si(job)()

elif state == JOB_STATE_SCHEDULED:
for hook_type in SUPPORTED_HOOK_TYPES:
if hook_type in job_config and job_config[hook_type]['enabled']:
expected = etcd_cl.read('%s/%s/expect' %
(job_base, hook_type), 1).value + 1
etcd_cl.write('%s/%s/expect' % (job_base, hook_type), expected,
prevValue=expected-1)
else:
# Reset/ Create the new job
return _reset_job.si(job)()

elif state == JOB_STATE_DEPLOY_REQUESTED:
pass
# Can not reset as deploy has started. Let the current job get
# processed before starting new one. Processed job does not mean
# successful deployment. It simply means that deployer has acknowledged
# the new deployment request.
self.retry(exc=ValueError(
'Job exists is in %s state. Can not create new '
'job until existing one gets removed.'))


@app.task
def load_config(*paths):
"""
Loads the totem config using given paths
@using_etcd
def _handle_noop(job, etcd_cl=None, etcd_base=None):
meta = job['meta-info']
job_base = _job_base_location(meta['owner'], meta['repo'], meta['ref'],
etcd_base)
etcd_cl.delete(job_base, recursive=True)
return update_job_state.si(meta['job-id'], JOB_STATE_NOOP, ret_value=job)

:param paths:
:return:
"""
return config.load_config(paths)

@app.task
@using_etcd
def _reset_job(job, etcd_cl=None, etcd_base=None):
meta = job['meta-info']
job_config = job['config']
job_base = _job_base_location(meta['owner'], meta['repo'], meta['ref'],
etcd_base)
etcd_cl.write(job_base+'/job-id', meta['job-id'])
# We will now expect to receive callbacks for new commit.
etcd_cl.write(job_base+'/commit', meta['commit'])
etcd_cl.write('%s/hooks/builder/done' % job_base, False)
ci_hooks = [name for name, hook in job_config['hooks']['ci']
.items() if hook['enabled']]
for ci_hook in ci_hooks:
etcd_cl.write('%s/hooks/ci/%s/done' % (job_base, ci_hook),
False)
return update_job_state.si(meta['job-id'], JOB_STATE_SCHEDULED,
ret_value=job)


def _as_job(job_config, job_id, owner, repo, ref, scm_type, commit=None):
return {
'config': copy.deepcopy(job_config),
'meta-info': {
'owner': owner,
'repo': repo,
'ref': ref,
'commit': commit,
'scm': scm_type,
'job-id': job_id
}
}

0 comments on commit 37a323a

Please sign in to comment.