Skip to content

Commit

Permalink
Index Job / persist job
Browse files Browse the repository at this point in the history
  • Loading branch information
sukrit007 committed Nov 14, 2014
1 parent 37a323a commit f1790df
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
17 changes: 17 additions & 0 deletions orchestrator/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,20 @@ def outer(*args, **kwargs):
kwargs.setdefault('etcd_base', TOTEM_ETCD_SETTINGS['base'])
return func(*args, **kwargs)
return outer


def safe_delete(etcd_cl, key, **kwargs):
"""
Performs safe deleteion of a given etcd key.
:param func:
:param key:
:param kwargs:
:return:
"""

try:
etcd_cl.delete(key, **kwargs)
except KeyError:
# Ignore non existent key
pass
28 changes: 19 additions & 9 deletions orchestrator/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
JOB_STATE_SCHEDULED, JOB_STATE_DEPLOY_REQUESTED, JOB_STATE_NOOP
from conf.celeryconfig import CLUSTER_NAME
from orchestrator.celery import app
from orchestrator.etcd import using_etcd
from orchestrator.etcd import using_etcd, safe_delete
from orchestrator.services import config
from orchestrator.services.distributed_lock import LockService, \
ResourceLockedException
from orchestrator.tasks.search import update_job_state
from orchestrator.tasks.search import index_job
from orchestrator.tasks.common import async_wait

__author__ = 'sukrit'
Expand Down Expand Up @@ -134,7 +134,7 @@ def create_job(self, job_config, owner, repo, ref, scm_type, commit=None,
job_id = etcd_cl.read(job_base+'/job-id').value
except KeyError:
state = JOB_STATE_NEW
job_id = uuid.uuid4()
job_id = str(uuid.uuid4())

job = _as_job(job_config, job_id, owner, repo, ref, scm_type, commit)

Expand Down Expand Up @@ -168,31 +168,41 @@ def create_job(self, job_config, owner, repo, ref, scm_type, commit=None,
@app.task
@using_etcd
def _handle_noop(job, etcd_cl=None, etcd_base=None):
job = copy.deepcopy(job)
job['state'] = JOB_STATE_NOOP
meta = job['meta-info']
job_base = _job_base_location(meta['owner'], meta['repo'], meta['ref'],
etcd_base)
etcd_cl.delete(job_base, recursive=True)
return update_job_state.si(meta['job-id'], JOB_STATE_NOOP, ret_value=job)
safe_delete(etcd_cl, job_base, recursive=True)
return index_job.si(job, ret_value=job)


@app.task
def _reset_job(job):
job = copy.deepcopy(job)
job['state'] = JOB_STATE_SCHEDULED
return (
_save_job.si(job) |
index_job.si(job, ret_value=job)
)()


@app.task
@using_etcd
def _reset_job(job, etcd_cl=None, etcd_base=None):
def _save_job(job, etcd_cl=None, etcd_base=None):
meta = job['meta-info']
job_config = job['config']
job_base = _job_base_location(meta['owner'], meta['repo'], meta['ref'],
etcd_base)
etcd_cl.write(job_base+'/job-id', meta['job-id'])
# We will now expect to receive callbacks for new commit.
etcd_cl.write(job_base+'/commit', meta['commit'])
etcd_cl.write('%s/hooks/builder/done' % job_base, False)
ci_hooks = [name for name, hook in job_config['hooks']['ci']
.items() if hook['enabled']]
for ci_hook in ci_hooks:
etcd_cl.write('%s/hooks/ci/%s/done' % (job_base, ci_hook),
False)
return update_job_state.si(meta['job-id'], JOB_STATE_SCHEDULED,
ret_value=job)
etcd_cl.write(job_base+'/state', job['state'])


def _as_job(job_config, job_id, owner, repo, ref, scm_type, commit=None):
Expand Down
5 changes: 3 additions & 2 deletions orchestrator/tasks/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@

@app.task
@orch_search
def index_job(deployment, es=None, idx=None):
def index_job(job, ret_value=None, es=None, idx=None):
"""
Creates a new deployment
:param deployment: Dictionary containing deployment parameters
"""
return es.index(idx, TYPE_JOBS, deployment, id=deployment['id'])
return ret_value or \
es.index(idx, TYPE_JOBS, job, id=job['meta-info']['job-id'])


@app.task
Expand Down

0 comments on commit f1790df

Please sign in to comment.