Skip to content
This repository has been archived by the owner on Oct 3, 2018. It is now read-only.

Commit

Permalink
Merge pull request #267 from novafloss/async
Browse files Browse the repository at this point in the history
Async build poll
  • Loading branch information
bersace authored Feb 20, 2017
2 parents e19e7b5 + e28a286 commit c5bacb0
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 160 deletions.
236 changes: 123 additions & 113 deletions jenkins_epo/extensions/jenkins.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from jenkinsapi.custom_exceptions import UnknownJob

from ..bot import Extension, Error, SkipHead
from ..jenkins import JENKINS
from ..jenkins import JENKINS, RESTClient
from ..repository import Commit, CommitStatus
from ..utils import match, switch_coro

Expand Down Expand Up @@ -120,118 +120,6 @@ def status_for_new_context(self, job, context, queue_empty):
return new_status


class AutoCancelExtension(JenkinsExtension):
stage = '30'

ref_re = re.compile(r'.*origin/(?P<ref>.*)')

def compute_build_ref_and_sha(self, job, build):
try:
jenkins_fullref = build.get_revision_branch()[0]['name']
except IndexError:
for action in build._data['actions']:
if 'parameters' in action:
break
else:
return
for parameter in action['parameters']:
if parameter['name'] == job.revision_param:
break
else:
return
return (
parameter['value'][len('refs/heads/'):],
self.current.last_commit.sha
)
else:
match = self.ref_re.match(jenkins_fullref)
if not match:
return
return (
match.group('ref'),
build.get_revision()
)

def iter_preset_statuses(self, contextes, build):
for context in contextes:
default_status = CommitStatus(
context=context, state='pending', description='Backed',
)
status = self.current.statuses.get(
context, default_status,
)
new_url = status.get('target_url') == build.baseurl
if status.is_queueable or new_url:
status = status.from_build(build)
yield status

def is_build_old(self, build, now, maxage):
seconds = build._data['timestamp'] / 1000.
build_date = datetime.fromtimestamp(seconds)
build_age = now - build_date
if build_date > now:
logger.warning(
"Build %s in the future. Is timezone correct?", build
)
return False
return build_age > maxage

@asyncio.coroutine
def run(self):
now = datetime.now()
maxage = timedelta(hours=2)
current_sha = self.current.last_commit.sha
logger.info("Polling running builds on Jenkins.")
for name, spec in self.current.job_specs.items():
job = self.current.jobs[name]
contextes = job.list_contexts(spec)
builds = reversed(sorted(
job.get_builds(),
key=lambda b: b.baseurl,
))

for build in builds:
build.poll()
yield from switch_coro()
if self.is_build_old(build, now, maxage):
logger.debug("Stopping build iteration for older builds.")
break

if not build._data['building']:
continue

try:
build_ref, build_sha = (
self.compute_build_ref_and_sha(job, build)
)
except TypeError:
logger.debug(
"Can't infer build ref and sha for %s.", build,
)
continue

if build_ref != self.current.head.ref:
continue

if build_sha == current_sha:
commit = self.current.last_commit
preset_statuses = self.iter_preset_statuses(
contextes, build,
)
for status in preset_statuses:
logger.info(
"Preset pending status for %s.", status['context'],
)
commit.maybe_update_status(status)
yield from switch_coro()
continue

commit = Commit(self.current.head.repository, build_sha)
status = CommitStatus(context=job.name).from_build(build)
logger.info("Queuing %s for cancel.", build)
self.current.cancel_queue.append((commit, status))


class CancellerExtension(JenkinsExtension):
stage = '49'

Expand Down Expand Up @@ -375,6 +263,128 @@ def process_error(self, spec, e):
)


class PollExtension(JenkinsExtension):
stage = '30'

ref_re = re.compile(r'.*origin/(?P<ref>.*)')

def compute_build_ref_and_sha(self, job, build):
try:
jenkins_fullref = build.get_revision_branch()[0]['name']
except IndexError:
for action in build._data['actions']:
if 'parameters' in action:
break
else:
return
for parameter in action['parameters']:
if parameter['name'] == job.revision_param:
break
else:
return
return (
parameter['value'][len('refs/heads/'):],
self.current.last_commit.sha
)
else:
match = self.ref_re.match(jenkins_fullref)
if not match:
return
return (
match.group('ref'),
build.get_revision()
)

def iter_preset_statuses(self, contextes, build):
for context in contextes:
default_status = CommitStatus(
context=context, state='pending', description='Backed',
)
status = self.current.statuses.get(
context, default_status,
)
new_url = status.get('target_url') == build.baseurl
if status.is_queueable or new_url:
status = status.from_build(build)
yield status

def is_build_old(self, build):
now = datetime.now()
maxage = timedelta(hours=2)
seconds = build._data['timestamp'] / 1000.
build_date = datetime.fromtimestamp(seconds)
build_age = now - build_date
if build_date > now:
logger.warning(
"Build %s in the future. Is timezone correct?", build
)
return False
return build_age > maxage

@asyncio.coroutine
def poll_job(self, spec):
job = self.current.jobs[spec.name]
contextes = job.list_contexts(spec)
builds = reversed(sorted(
job.get_builds(),
key=lambda b: b.baseurl,
))
for build in builds:
build._data = yield from (
RESTClient(build.baseurl)
.aget(depth=build.depth)
)
yield from switch_coro()
if self.is_build_old(build):
logger.debug("Stop polling %s for older builds.", spec.name)
break

if not build._data['building']:
continue

try:
build_ref, build_sha = (
self.compute_build_ref_and_sha(job, build)
)
except TypeError:
logger.debug(
"Can't infer build ref and sha for %s.", build,
)
continue

if build_ref != self.current.head.ref:
continue

if build_sha == self.current.last_commit.sha:
commit = self.current.last_commit
preset_statuses = self.iter_preset_statuses(
contextes, build,
)
for status in preset_statuses:
logger.info(
"Preset pending status for %s.", status['context'],
)
commit.maybe_update_status(status)
yield from switch_coro()
continue

commit = Commit(self.current.head.repository, build_sha)
status = CommitStatus(context=job.name).from_build(build)
logger.info("Queuing %s for cancel.", build)
self.current.cancel_queue.append((commit, status))

@asyncio.coroutine
def run(self):
logger.info("Polling running builds on Jenkins.")
tasks = []
loop = asyncio.get_event_loop()
for name, spec in self.current.job_specs.items():
tasks.append(
loop.create_task(self.poll_job(spec))
)
yield from asyncio.gather(*tasks)


class Stage(object):
@classmethod
def factory(cls, entry):
Expand Down
7 changes: 5 additions & 2 deletions jenkins_epo/jenkins.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from jenkins_yml import Job as JobSpec
import requests
import yaml
from yarl import URL

from .settings import SETTINGS
from .utils import match, parse_patterns, retry
Expand All @@ -45,9 +46,11 @@ def __call__(self, arg):
def __getattr__(self, name):
return self(name)

def aget(self):
def aget(self, **kw):
session = aiohttp.ClientSession()
url = '%s/api/json' % (self.path)
url = URL('%s/api/json' % (self.path))
if kw:
url = url.with_query(**kw)
logger.debug("GET %s", url)
try:
response = yield from session.get(url, timeout=10)
Expand Down
3 changes: 2 additions & 1 deletion jenkins_epo/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ def entrypoint(argv=None):
logger.info("Done.")
except bdb.BdbQuit:
logger.info('Graceful exit from debugger.')
except Exception:
except Exception as e:
if SETTINGS.DEBUG:
logger.error("%s: %s", type(e), e)
post_mortem()
else:
logger.exception("Unhandled error:")
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
'help = jenkins_epo.extensions.core:HelpExtension',
'autocancel = jenkins_epo.extensions.core:AutoCancelExtension',
'jenkins-backed = jenkins_epo.extensions.jenkins:BackedExtension', # noqa
'jenkins-autocancel = jenkins_epo.extensions.jenkins:AutoCancelExtension', # noqa
'jenkins-poll = jenkins_epo.extensions.jenkins:PollExtension',
'jenkins-builder = jenkins_epo.extensions.jenkins:BuilderExtension', # noqa
'jenkins-canceller = jenkins_epo.extensions.jenkins:CancellerExtension', # noqa
'jenkins-createjobs = jenkins_epo.extensions.jenkins:CreateJobsExtension', # noqa
Expand Down
Loading

0 comments on commit c5bacb0

Please sign in to comment.