diff --git a/src/pushsource/_impl/backend/koji_source.py b/src/pushsource/_impl/backend/koji_source.py index abe48ac1..6bdd282c 100644 --- a/src/pushsource/_impl/backend/koji_source.py +++ b/src/pushsource/_impl/backend/koji_source.py @@ -5,6 +5,7 @@ from concurrent import futures from six.moves.queue import Queue, Empty +import six from threading import Thread import koji @@ -28,6 +29,10 @@ LOG = logging.getLogger("pushsource") CACHE_LOCK = threading.RLock() +# Arguments used for retry policy. +# Provided so it can be overridden from tests to reduce time spent on retries. +RETRY_ARGS = {} + class ListArchivesCommand(object): def __init__(self, build): @@ -205,7 +210,7 @@ def __init__( self._executor = ( executor or Executors.thread_pool(name="pushsource-koji", max_workers=threads) - .with_retry() + .with_retry(**RETRY_ARGS) .with_cancel_on_shutdown() ) @@ -231,6 +236,28 @@ def _koji_session(self): tls.koji_session = koji.ClientSession(self._url) return tls.koji_session + def _koji_check(self): + # Do a basic connection check with koji. + # If this succeeds, we can be reasonably sure that the koji connection is + # more or less working. + try: + version = self._executor.submit(self._koji_get_version).result() + except Exception as ex: # pylint: disable=broad-except + # TODO: drop this log when py2 support is dropped. + # It's only here because py2 has no exception chaining. + msg = "Communication error with koji at %s" % self._url + LOG.exception(msg) + + six.raise_from(RuntimeError(msg), ex) + + LOG.debug("Connected to koji %s at %s", version, self._url) + + def _koji_get_version(self): + with CACHE_LOCK: + if "koji_version" not in self._cache: + self._cache["koji_version"] = self._koji_session.getVersion() + return self._cache["koji_version"] + def _get_rpm(self, rpm): return self._cache["rpm"][rpm] @@ -573,6 +600,22 @@ def _do_fetch(self, koji_queue, exceptions): raise def __iter__(self): + # Try a (blocking) call to koji before anything else. + # + # The reasons for this are: + # + # - give an error which is hopefully easier to understand if the + # koji connection is completely broken (e.g. caller provides bogus + # URL) + # + # - work around pyasn1 bug https://github.com/etingof/pyasn1/issues/53 + # which affects some (ancient) environments. That thread-safety bug + # can cause parsing of subjectAltName to fail if it first occurs from + # multiple threads simultaneously. Doing a warm-up from one thread + # before we spawn multiple threads is a way to avoid this. + # + self._koji_check() + # Queue holding all requests we need to make to koji. # We try to fetch as much as we can early to make efficient use # of multicall. diff --git a/tests/koji/conftest.py b/tests/koji/conftest.py index eb8b54d7..9d67af3d 100644 --- a/tests/koji/conftest.py +++ b/tests/koji/conftest.py @@ -1,6 +1,8 @@ from pytest import fixture from mock import patch +from pushsource._impl.backend import koji_source + from .fake_koji import FakeKojiController @@ -15,3 +17,10 @@ def fake_koji(): @fixture def koji_dir(tmpdir): yield str(tmpdir.mkdir("koji")) + + +@fixture(autouse=True) +def fast_retry(monkeypatch): + # Force a custom retry policy so that tests which simulate errors + # don't spent as much time retrying as they would in production. + monkeypatch.setattr(koji_source, "RETRY_ARGS", {"max_sleep": 0.0001}) diff --git a/tests/koji/fake_koji.py b/tests/koji/fake_koji.py index 3e914d55..df72893e 100644 --- a/tests/koji/fake_koji.py +++ b/tests/koji/fake_koji.py @@ -158,6 +158,9 @@ def _return_or_raise(self, value): raise value return value + def getVersion(self): + return "fake-version" + def getRPM(self, rpm): return self._return_or_raise(self._ctrl.rpm_data.get(rpm)) diff --git a/tests/koji/test_koji.py b/tests/koji/test_koji.py index c5ccad57..68fa415d 100644 --- a/tests/koji/test_koji.py +++ b/tests/koji/test_koji.py @@ -18,13 +18,27 @@ def test_get_koji(): assert Source.get("koji:https://koji.example.com/") -def test_koji_empty(): +def test_koji_empty(fake_koji): """Empty koji source yields no push items""" source = Source.get("koji:https://koji.example.com/") assert list(source) == [] +def test_koji_connect_error(): + """Source raises a reasonable error if server can't be contacted""" + + # Note: fake_koji fixture not used here, so this will really try to connect + source = Source.get("koji:https://localhost:1234/this-aint-koji") + with raises(RuntimeError) as exc_info: + list(source) + + assert ( + "Communication error with koji at https://localhost:1234/this-aint-koji" + in str(exc_info.value) + ) + + def test_koji_nobuild(fake_koji): """koji source referencing nonexistent build will fail"""