Skip to content
This repository has been archived by the owner on Oct 3, 2018. It is now read-only.

Commit

Permalink
Merge pull request #268 from novafloss/async
Browse files Browse the repository at this point in the history
Faster Jenkins poll
  • Loading branch information
bersace committed Feb 20, 2017
2 parents c5bacb0 + c90e639 commit 2f69a42
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 259 deletions.
119 changes: 36 additions & 83 deletions jenkins_epo/extensions/jenkins.py
Expand Up @@ -14,14 +14,12 @@

import asyncio
from collections import OrderedDict
from datetime import datetime, timedelta
import logging
import re

from jenkinsapi.custom_exceptions import UnknownJob

from ..bot import Extension, Error, SkipHead
from ..jenkins import JENKINS, RESTClient
from ..jenkins import JENKINS
from ..repository import Commit, CommitStatus
from ..utils import match, switch_coro

Expand Down Expand Up @@ -79,6 +77,7 @@ def run(self):
rebuild_failed=self.current.rebuild_failed
)
queue_empty = JENKINS.is_queue_empty()
yield from switch_coro()
toqueue_contexts = []
for context in not_built:
logger.debug("Computing new status for %s.", spec)
Expand All @@ -101,6 +100,7 @@ def run(self):
target_url=job.baseurl,
)
)
yield from switch_coro()

def status_for_new_context(self, job, context, queue_empty):
new_status = CommitStatus(target_url=job.baseurl, context=context)
Expand Down Expand Up @@ -217,16 +217,25 @@ def process_job_specs(self):
if update:
yield JENKINS.update_job, spec

@asyncio.coroutine
def fetch_job(self, name):
asyncio.Task.current_task().logging_id = self.current.head.sha[:4]
if name in self.current.jobs:
return
try:
self.current.jobs[name] = yield from JENKINS.aget_job(name)
except UnknownJob:
pass

@asyncio.coroutine
def run(self):
logger.info("Fetching jobs from Jenkins.")
for name in self.current.job_specs:
if name in self.current.jobs:
continue
try:
self.current.jobs[name] = yield from JENKINS.aget_job(name)
except UnknownJob:
pass
loop = asyncio.get_event_loop()
tasks = [
loop.create_task(self.fetch_job(name))
for name in self.current.job_specs
]
yield from asyncio.gather(*tasks)

for action, spec in self.process_job_specs():
job = None
Expand Down Expand Up @@ -266,35 +275,6 @@ def process_error(self, spec, e):
class PollExtension(JenkinsExtension):
stage = '30'

ref_re = re.compile(r'.*origin/(?P<ref>.*)')

def compute_build_ref_and_sha(self, job, build):
try:
jenkins_fullref = build.get_revision_branch()[0]['name']
except IndexError:
for action in build._data['actions']:
if 'parameters' in action:
break
else:
return
for parameter in action['parameters']:
if parameter['name'] == job.revision_param:
break
else:
return
return (
parameter['value'][len('refs/heads/'):],
self.current.last_commit.sha
)
else:
match = self.ref_re.match(jenkins_fullref)
if not match:
return
return (
match.group('ref'),
build.get_revision()
)

def iter_preset_statuses(self, contextes, build):
for context in contextes:
default_status = CommitStatus(
Expand All @@ -308,54 +288,26 @@ def iter_preset_statuses(self, contextes, build):
status = status.from_build(build)
yield status

def is_build_old(self, build):
now = datetime.now()
maxage = timedelta(hours=2)
seconds = build._data['timestamp'] / 1000.
build_date = datetime.fromtimestamp(seconds)
build_age = now - build_date
if build_date > now:
logger.warning(
"Build %s in the future. Is timezone correct?", build
)
return False
return build_age > maxage

@asyncio.coroutine
def poll_job(self, spec):
asyncio.Task.current_task().logging_id = self.current.head.sha[:4]
job = self.current.jobs[spec.name]
payload = yield from job.fetch_builds()
contextes = job.list_contexts(spec)
builds = reversed(sorted(
job.get_builds(),
key=lambda b: b.baseurl,
))
for build in builds:
build._data = yield from (
RESTClient(build.baseurl)
.aget(depth=build.depth)
)
yield from switch_coro()
if self.is_build_old(build):
logger.debug("Stop polling %s for older builds.", spec.name)
break

if not build._data['building']:
for build in job.process_builds(payload):
if not build.is_running:
continue

try:
build_ref, build_sha = (
self.compute_build_ref_and_sha(job, build)
)
except TypeError:
logger.debug(
"Can't infer build ref and sha for %s.", build,
)
continue
if build.is_outdated:
break

if build_ref != self.current.head.ref:
if build.ref != self.current.head.ref:
continue

if build_sha == self.current.last_commit.sha:
try:
if build.sha == self.current.head.sha:
continue
except Exception:
commit = self.current.last_commit
preset_statuses = self.iter_preset_statuses(
contextes, build,
Expand All @@ -367,11 +319,12 @@ def poll_job(self, spec):
commit.maybe_update_status(status)
yield from switch_coro()
continue

commit = Commit(self.current.head.repository, build_sha)
status = CommitStatus(context=job.name).from_build(build)
logger.info("Queuing %s for cancel.", build)
self.current.cancel_queue.append((commit, status))
else:
commit = Commit(self.current.head.repository, build.sha)
status = CommitStatus(context=job.name).from_build(build)
logger.info("Queuing %s for cancel.", build)
self.current.cancel_queue.append((commit, status))
logger.debug("Polling %s done.", spec.name)

@asyncio.coroutine
def run(self):
Expand Down
119 changes: 103 additions & 16 deletions jenkins_epo/jenkins.py
Expand Up @@ -13,15 +13,16 @@
# jenkins-epo. If not, see <http://www.gnu.org/licenses/>.


import ast
import asyncio
from datetime import datetime
from datetime import datetime, timedelta
from itertools import product
import logging
import re

import aiohttp
from jenkinsapi.jenkinsbase import JenkinsBase
from jenkinsapi.build import Build
from jenkinsapi.build import Build as JenkinsBuild
from jenkinsapi.jenkins import Jenkins, Requester
from jenkins_yml import Job as JobSpec
import requests
Expand All @@ -46,18 +47,23 @@ def __call__(self, arg):
def __getattr__(self, name):
return self(name)

def aget(self, **kw):
@retry
def afetch(self, **kw):
session = aiohttp.ClientSession()
url = URL('%s/api/json' % (self.path))
url = URL(self.path)
if kw:
url = url.with_query(**kw)
logger.debug("GET %s", url)
try:
response = yield from session.get(url, timeout=10)
payload = yield from response.json()
payload = yield from response.read()
finally:
yield from session.close()
return payload
return payload.decode('utf-8')

def aget(self, **kw):
payload = yield from self.api.python.afetch(**kw)
return ast.literal_eval(payload)


class VerboseRequester(Requester):
Expand Down Expand Up @@ -110,7 +116,6 @@ def load(self):

@retry
def is_queue_empty(self):
logging.debug("GET %s queue.", SETTINGS.JENKINS_URL)
queue = self.get_queue()
queue.poll()
data = queue._data
Expand All @@ -132,8 +137,9 @@ def get_job(self, name):
def aget_job(self, name):
self.load()
instance = self._instance.get_job(name)
data = yield from RESTClient(instance.baseurl).aget()
instance._data = data
client = RESTClient(instance.baseurl)
instance._data = yield from client.aget()
instance._config = yield from client('config.xml').afetch()
return Job.factory(instance)

DESCRIPTION_TMPL = """\
Expand Down Expand Up @@ -188,6 +194,73 @@ def update_job(self, job_spec):
JENKINS = LazyJenkins()


class Build(object):
def __init__(self, job, payload, api_instance=None):
self.job = job
self.payload = payload
self._instance = api_instance
self.params = self.process_params(payload)

@staticmethod
def process_params(payload):
for action in payload.get('actions', []):
if 'parameters' in action:
break
else:
return {}

return {
p['name']: p['value']
for p in action['parameters']
if 'value' in p
}

def __getattr__(self, name):
return getattr(self._instance, name)

def __str__(self):
return str(self._instance)

@property
def is_outdated(self):
now = datetime.now()
maxage = timedelta(hours=2)
seconds = self.payload['timestamp'] / 1000.
build_date = datetime.fromtimestamp(seconds)
if build_date > now:
logger.warning(
"Build %s in the future. Is timezone correct?", self
)
return False
build_age = now - build_date
return build_age > maxage

@property
def is_running(self):
return self.payload['building']

_ref_re = re.compile(r'.*origin/(?P<ref>.*)')

@property
def ref(self):
try:
fullref = self.payload['lastBuiltRevision']['branch']['name']
except (TypeError, KeyError):
return self.params[self.job.revision_param][len('refs/heads/'):]
else:
match = self._ref_re.match(fullref)
if not match:
raise Exception("Unknown branch %s" % fullref)
return match.group('ref')

@property
def sha(self):
try:
return self.payload['lastBuiltRevision']['branch']['SHA1']
except (TypeError, KeyError):
raise Exception("No SHA1 yet.")


class Job(object):
jobs_filter = parse_patterns(SETTINGS.JOBS)
embedded_data_re = re.compile(
Expand Down Expand Up @@ -281,13 +354,27 @@ def node_param(self):
return self._node_param

@asyncio.coroutine
def update_data_async(self):
client = RESTClient(self._instance.baseurl)
self._instance._data = yield from client.aget()

def get_builds(self):
for number, url in self.get_build_dict().items():
yield Build(url, number, self._instance)
def fetch_builds(self):
tree = "builds[" + (
"actions[" + (
"parameters[name,value],"
"lastBuiltRevision[branch[name,SHA1]]"
) + "],"
"building,duration,number,result,timestamp,url"
) + "]"
payload = yield from RESTClient(self.baseurl).aget(tree=tree)
return payload['builds']

def process_builds(self, payload):
payload = reversed(sorted(payload, key=lambda b: b['number']))
for entry in payload:
api_instance = JenkinsBuild(
url=entry['url'],
buildno=entry['number'],
job=self._instance
)
api_instance._data = entry
yield Build(self, entry, api_instance)


class FreestyleJob(Job):
Expand Down
2 changes: 1 addition & 1 deletion jenkins_epo/workers.py
Expand Up @@ -75,7 +75,7 @@ def worker(self, id_):
except CancelledError:
logger.warn("Cancel of %s", item)
except Exception as e:
if SETTINGS.DEBUG:
if SETTINGS.VERBOSE:
logger.exception("Failed to process %s: %s", item, e)
else:
logger.error("Failed to process %s: %s", item, e)
Expand Down
2 changes: 2 additions & 0 deletions tests/extensions/test_create_jobs.py
Expand Up @@ -226,6 +226,7 @@ def test_jenkins_create_success(mocker):

ext = CreateJobsExtension('createjob', Mock())
ext.current = ext.bot.current
ext.current.head.sha = 'cafed0d0'
ext.current.head.repository.jobs = {}
ext.current.job_specs = dict(new=JobSpec('new', dict(periodic=True)))
ext.current.jobs = {}
Expand Down Expand Up @@ -254,6 +255,7 @@ def test_jenkins_fails_existing(mocker):
ext = CreateJobsExtension('createjob', Mock())
ext.current = ext.bot.current
ext.current.errors = []
ext.current.head.sha = 'cafed0d0'
ext.current.head.repository.jobs = {'job': Mock()}
ext.current.job_specs = dict(job=JobSpec.factory('job', 'toto'))
ext.current.jobs = {'job': Mock()}
Expand Down

0 comments on commit 2f69a42

Please sign in to comment.