Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/pushsource/_impl/backend/koji_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from concurrent import futures
from six.moves.queue import Queue, Empty
import six
from threading import Thread

import koji
Expand All @@ -28,6 +29,10 @@
LOG = logging.getLogger("pushsource")
CACHE_LOCK = threading.RLock()

# Arguments used for retry policy.
# Provided so it can be overridden from tests to reduce time spent on retries.
RETRY_ARGS = {}


class ListArchivesCommand(object):
def __init__(self, build):
Expand Down Expand Up @@ -205,7 +210,7 @@ def __init__(
self._executor = (
executor
or Executors.thread_pool(name="pushsource-koji", max_workers=threads)
.with_retry()
.with_retry(**RETRY_ARGS)
.with_cancel_on_shutdown()
)

Expand All @@ -231,6 +236,28 @@ def _koji_session(self):
tls.koji_session = koji.ClientSession(self._url)
return tls.koji_session

def _koji_check(self):
# Do a basic connection check with koji.
# If this succeeds, we can be reasonably sure that the koji connection is
# more or less working.
try:
version = self._executor.submit(self._koji_get_version).result()
except Exception as ex: # pylint: disable=broad-except
# TODO: drop this log when py2 support is dropped.
# It's only here because py2 has no exception chaining.
msg = "Communication error with koji at %s" % self._url
LOG.exception(msg)

six.raise_from(RuntimeError(msg), ex)

LOG.debug("Connected to koji %s at %s", version, self._url)

def _koji_get_version(self):
with CACHE_LOCK:
if "koji_version" not in self._cache:
self._cache["koji_version"] = self._koji_session.getVersion()
return self._cache["koji_version"]

def _get_rpm(self, rpm):
return self._cache["rpm"][rpm]

Expand Down Expand Up @@ -573,6 +600,22 @@ def _do_fetch(self, koji_queue, exceptions):
raise

def __iter__(self):
# Try a (blocking) call to koji before anything else.
#
# The reasons for this are:
#
# - give an error which is hopefully easier to understand if the
# koji connection is completely broken (e.g. caller provides bogus
# URL)
#
# - work around pyasn1 bug https://github.com/etingof/pyasn1/issues/53
# which affects some (ancient) environments. That thread-safety bug
# can cause parsing of subjectAltName to fail if it first occurs from
# multiple threads simultaneously. Doing a warm-up from one thread
# before we spawn multiple threads is a way to avoid this.
#
self._koji_check()

# Queue holding all requests we need to make to koji.
# We try to fetch as much as we can early to make efficient use
# of multicall.
Expand Down
9 changes: 9 additions & 0 deletions tests/koji/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from pytest import fixture
from mock import patch

from pushsource._impl.backend import koji_source

from .fake_koji import FakeKojiController


Expand All @@ -15,3 +17,10 @@ def fake_koji():
@fixture
def koji_dir(tmpdir):
yield str(tmpdir.mkdir("koji"))


@fixture(autouse=True)
def fast_retry(monkeypatch):
# Force a custom retry policy so that tests which simulate errors
# don't spent as much time retrying as they would in production.
monkeypatch.setattr(koji_source, "RETRY_ARGS", {"max_sleep": 0.0001})
3 changes: 3 additions & 0 deletions tests/koji/fake_koji.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ def _return_or_raise(self, value):
raise value
return value

def getVersion(self):
return "fake-version"

def getRPM(self, rpm):
return self._return_or_raise(self._ctrl.rpm_data.get(rpm))

Expand Down
16 changes: 15 additions & 1 deletion tests/koji/test_koji.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,27 @@ def test_get_koji():
assert Source.get("koji:https://koji.example.com/")


def test_koji_empty():
def test_koji_empty(fake_koji):
"""Empty koji source yields no push items"""

source = Source.get("koji:https://koji.example.com/")
assert list(source) == []


def test_koji_connect_error():
"""Source raises a reasonable error if server can't be contacted"""

# Note: fake_koji fixture not used here, so this will really try to connect
source = Source.get("koji:https://localhost:1234/this-aint-koji")
with raises(RuntimeError) as exc_info:
list(source)

assert (
"Communication error with koji at https://localhost:1234/this-aint-koji"
in str(exc_info.value)
)


def test_koji_nobuild(fake_koji):
"""koji source referencing nonexistent build will fail"""

Expand Down