From 1a78375f4c6984ea74d1290767fe6ce8bdfc020b Mon Sep 17 00:00:00 2001 From: Rohan McGovern Date: Wed, 25 Aug 2021 13:11:09 +1000 Subject: [PATCH] koji: do a connection check before other operations Previously we jump right into spawning a thread pool and farming out some koji requests to threads. Unfortunately, RHEL6 environments suffer from a thread-safety issue: if the very first parsing of a SAN field from an SSL cert in the lifetime of a process happens from multiple threads at once, corruption can occur due to thread-unsafe lazy initialization in the pyasn1 library. This is https://github.com/etingof/pyasn1/issues/53 . See RHELDST-7454 for examples of the resulting crash. Warming up from one thread before we start concurrent operations should be an effective way to work around this and is arguably nicer behavior anyway (e.g. trying to use an invalid koji URL will now give a more understandable exception coming from a single thread). So let's do that. --- src/pushsource/_impl/backend/koji_source.py | 45 ++++++++++++++++++++- tests/koji/conftest.py | 9 +++++ tests/koji/fake_koji.py | 3 ++ tests/koji/test_koji.py | 16 +++++++- 4 files changed, 71 insertions(+), 2 deletions(-) diff --git a/src/pushsource/_impl/backend/koji_source.py b/src/pushsource/_impl/backend/koji_source.py index abe48ac1..6bdd282c 100644 --- a/src/pushsource/_impl/backend/koji_source.py +++ b/src/pushsource/_impl/backend/koji_source.py @@ -5,6 +5,7 @@ from concurrent import futures from six.moves.queue import Queue, Empty +import six from threading import Thread import koji @@ -28,6 +29,10 @@ LOG = logging.getLogger("pushsource") CACHE_LOCK = threading.RLock() +# Arguments used for retry policy. +# Provided so it can be overridden from tests to reduce time spent on retries. +RETRY_ARGS = {} + class ListArchivesCommand(object): def __init__(self, build): @@ -205,7 +210,7 @@ def __init__( self._executor = ( executor or Executors.thread_pool(name="pushsource-koji", max_workers=threads) - .with_retry() + .with_retry(**RETRY_ARGS) .with_cancel_on_shutdown() ) @@ -231,6 +236,28 @@ def _koji_session(self): tls.koji_session = koji.ClientSession(self._url) return tls.koji_session + def _koji_check(self): + # Do a basic connection check with koji. + # If this succeeds, we can be reasonably sure that the koji connection is + # more or less working. + try: + version = self._executor.submit(self._koji_get_version).result() + except Exception as ex: # pylint: disable=broad-except + # TODO: drop this log when py2 support is dropped. + # It's only here because py2 has no exception chaining. + msg = "Communication error with koji at %s" % self._url + LOG.exception(msg) + + six.raise_from(RuntimeError(msg), ex) + + LOG.debug("Connected to koji %s at %s", version, self._url) + + def _koji_get_version(self): + with CACHE_LOCK: + if "koji_version" not in self._cache: + self._cache["koji_version"] = self._koji_session.getVersion() + return self._cache["koji_version"] + def _get_rpm(self, rpm): return self._cache["rpm"][rpm] @@ -573,6 +600,22 @@ def _do_fetch(self, koji_queue, exceptions): raise def __iter__(self): + # Try a (blocking) call to koji before anything else. + # + # The reasons for this are: + # + # - give an error which is hopefully easier to understand if the + # koji connection is completely broken (e.g. caller provides bogus + # URL) + # + # - work around pyasn1 bug https://github.com/etingof/pyasn1/issues/53 + # which affects some (ancient) environments. That thread-safety bug + # can cause parsing of subjectAltName to fail if it first occurs from + # multiple threads simultaneously. Doing a warm-up from one thread + # before we spawn multiple threads is a way to avoid this. + # + self._koji_check() + # Queue holding all requests we need to make to koji. # We try to fetch as much as we can early to make efficient use # of multicall. diff --git a/tests/koji/conftest.py b/tests/koji/conftest.py index eb8b54d7..9d67af3d 100644 --- a/tests/koji/conftest.py +++ b/tests/koji/conftest.py @@ -1,6 +1,8 @@ from pytest import fixture from mock import patch +from pushsource._impl.backend import koji_source + from .fake_koji import FakeKojiController @@ -15,3 +17,10 @@ def fake_koji(): @fixture def koji_dir(tmpdir): yield str(tmpdir.mkdir("koji")) + + +@fixture(autouse=True) +def fast_retry(monkeypatch): + # Force a custom retry policy so that tests which simulate errors + # don't spent as much time retrying as they would in production. + monkeypatch.setattr(koji_source, "RETRY_ARGS", {"max_sleep": 0.0001}) diff --git a/tests/koji/fake_koji.py b/tests/koji/fake_koji.py index 3e914d55..df72893e 100644 --- a/tests/koji/fake_koji.py +++ b/tests/koji/fake_koji.py @@ -158,6 +158,9 @@ def _return_or_raise(self, value): raise value return value + def getVersion(self): + return "fake-version" + def getRPM(self, rpm): return self._return_or_raise(self._ctrl.rpm_data.get(rpm)) diff --git a/tests/koji/test_koji.py b/tests/koji/test_koji.py index c5ccad57..68fa415d 100644 --- a/tests/koji/test_koji.py +++ b/tests/koji/test_koji.py @@ -18,13 +18,27 @@ def test_get_koji(): assert Source.get("koji:https://koji.example.com/") -def test_koji_empty(): +def test_koji_empty(fake_koji): """Empty koji source yields no push items""" source = Source.get("koji:https://koji.example.com/") assert list(source) == [] +def test_koji_connect_error(): + """Source raises a reasonable error if server can't be contacted""" + + # Note: fake_koji fixture not used here, so this will really try to connect + source = Source.get("koji:https://localhost:1234/this-aint-koji") + with raises(RuntimeError) as exc_info: + list(source) + + assert ( + "Communication error with koji at https://localhost:1234/this-aint-koji" + in str(exc_info.value) + ) + + def test_koji_nobuild(fake_koji): """koji source referencing nonexistent build will fail"""