diff --git a/CHANGELOG.md b/CHANGELOG.md index cee00b5d..ec2f7512 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 now also validated. - `ModuleMdPushItem.name` now uses the NSVCA of a module rather than the filename of a loaded modulemd file, when this metadata is available. +- `errata` source now includes FTP paths in the `dest` field of generated push items, + where applicable. ## [2.6.0] - 2021-05-27 diff --git a/src/pushsource/_impl/backend/errata_source/__init__.py b/src/pushsource/_impl/backend/errata_source/__init__.py new file mode 100644 index 00000000..1785f29b --- /dev/null +++ b/src/pushsource/_impl/backend/errata_source/__init__.py @@ -0,0 +1 @@ +from .errata_source import ErrataSource diff --git a/src/pushsource/_impl/backend/errata_source/errata_client.py b/src/pushsource/_impl/backend/errata_source/errata_client.py new file mode 100644 index 00000000..fc333dd1 --- /dev/null +++ b/src/pushsource/_impl/backend/errata_source/errata_client.py @@ -0,0 +1,56 @@ +import logging +import threading +from functools import partial + +from six.moves import xmlrpc_client +from more_executors import Executors +from more_executors.futures import f_zip, f_map + +from ...compat_attr import attr + +LOG = logging.getLogger("pushsource.errata_client") + + +@attr.s() +class ErrataRaw(object): + # Helper to collect raw responses of all ET APIs for a single advisory + advisory_cdn_metadata = attr.ib(type=dict) + advisory_cdn_file_list = attr.ib(type=dict) + ftp_paths = attr.ib(type=dict) + + +class ErrataClient(object): + def __init__(self, threads, url): + self._executor = Executors.thread_pool(max_workers=threads).with_retry() + self._url = url + self._tls = threading.local() + + self._get_advisory_cdn_metadata = partial( + self._call_et, "get_advisory_cdn_metadata" + ) + self._get_advisory_cdn_file_list = partial( + self._call_et, "get_advisory_cdn_file_list" + ) + self._get_ftp_paths = partial(self._call_et, "get_ftp_paths") + + @property + def _errata_service(self): + # XML-RPC client connected to errata_service. + # Each thread uses a separate client. + if not hasattr(self._tls, "errata_service"): + LOG.debug("Creating XML-RPC client for Errata Tool: %s", self._url) + self._tls.errata_service = xmlrpc_client.ServerProxy(self._url) + return self._tls.errata_service + + def get_raw_f(self, advisory_id): + """Returns Future[ErrataRaw] holding all ET responses for a particular advisory.""" + all_responses = f_zip( + self._executor.submit(self._get_advisory_cdn_metadata, advisory_id), + self._executor.submit(self._get_advisory_cdn_file_list, advisory_id), + self._executor.submit(self._get_ftp_paths, advisory_id), + ) + return f_map(all_responses, lambda tup: ErrataRaw(*tup)) + + def _call_et(self, method_name, advisory_id): + method = getattr(self._errata_service, method_name) + return method(advisory_id) diff --git a/src/pushsource/_impl/backend/errata_source.py b/src/pushsource/_impl/backend/errata_source/errata_source.py similarity index 66% rename from src/pushsource/_impl/backend/errata_source.py rename to src/pushsource/_impl/backend/errata_source/errata_source.py index 45d213b0..46a60e7b 100644 --- a/src/pushsource/_impl/backend/errata_source.py +++ b/src/pushsource/_impl/backend/errata_source/errata_source.py @@ -1,18 +1,17 @@ import os -import threading import logging from concurrent import futures import six from six.moves.urllib import parse -from six.moves.xmlrpc_client import ServerProxy from more_executors import Executors -from more_executors.futures import f_map, f_zip -from .. import compat_attr as attr -from ..source import Source -from ..model import ErratumPushItem -from ..helpers import list_argument +from .errata_client import ErrataClient + +from ... import compat_attr as attr +from ...source import Source +from ...model import ErratumPushItem +from ...helpers import list_argument LOG = logging.getLogger("pushsource") @@ -20,15 +19,7 @@ class ErrataSource(Source): """Uses an advisory from Errata Tool as the source of push items.""" - def __init__( - self, - url, - errata, - target="cdn", - koji_source=None, - threads=4, - timeout=60 * 60 * 4, - ): + def __init__(self, url, errata, koji_source=None, threads=4, timeout=60 * 60 * 4): """Create a new source. Parameters: @@ -41,13 +32,6 @@ def __init__( If a single string is given, multiple IDs may be comma-separated. - target (str) - The target type used when querying Errata Tool. - The target type may influence the content produced, - depending on the Errata Tool settings. - - Valid values include at least: "cdn", "rhn", "ftp". - koji_source (str) URL of a koji source associated with this Errata Tool instance. @@ -62,16 +46,18 @@ def __init__( """ self._url = url self._errata = list_argument(errata) - self._executor = Executors.thread_pool(max_workers=threads).with_retry() + self._client = ErrataClient(threads=threads, url=self._errata_service_url) + + # This executor doesn't use retry because koji & ET executors already do that. + self._executor = Executors.thread_pool(max_workers=threads) + # We set aside a separate thread pool for koji so that there are separate # queues for ET and koji calls, yet we avoid creating a new thread pool for # each koji source. self._koji_executor = Executors.thread_pool(max_workers=threads).with_retry() self._koji_cache = {} self._koji_source_url = koji_source - self._target = target self._timeout = timeout - self._tls = threading.local() @property def _errata_service_url(self): @@ -98,49 +84,33 @@ def _koji_source(self, **kwargs): **kwargs ) - @property - def _errata_service(self): - # XML-RPC client connected to errata_service. - # Each thread uses a separate client. - if not hasattr(self._tls, "errata_service"): - url = self._errata_service_url - LOG.debug("Creating XML-RPC client for Errata Tool: %s", url) - self._tls.errata_service = ServerProxy(url) - return self._tls.errata_service - @property def _advisory_ids(self): # TODO: other cases (comma-separated; plain string) return self._errata - def _get_advisory_metadata(self, advisory_id): - # TODO: use CDN API even for RHN? - return self._errata_service.get_advisory_cdn_metadata(advisory_id) - - def _get_advisory_file_list(self, advisory_id): - # TODO: look at the target hint - return self._errata_service.get_advisory_cdn_file_list(advisory_id) - - def _push_items_from_raw(self, metadata_and_file_list): - (metadata, file_list) = metadata_and_file_list + def _push_items_from_raw(self, raw): + erratum = ErratumPushItem._from_data(raw.advisory_cdn_metadata) - erratum = ErratumPushItem._from_data(metadata) - files = self._push_items_from_files(erratum, file_list) + rpms = self._push_items_from_rpms(erratum, raw.advisory_cdn_file_list) - # The erratum should go to all the same destinations as the files. + # The erratum should go to all the same destinations as the rpms, + # before FTP paths are added. erratum_dest = set(erratum.dest or []) - for file in files: - for dest in file.dest: + for rpm in rpms: + for dest in rpm.dest: erratum_dest.add(dest) - erratum = attr.evolve(erratum, dest=sorted(erratum_dest)) - return [erratum] + files + # Enrich RPM push items with their FTP paths if any. + rpms = self._add_ftp_paths(rpms, raw.ftp_paths) + + return [erratum] + rpms - def _push_items_from_files(self, erratum, file_list): + def _push_items_from_rpms(self, erratum, rpm_list): out = [] - for build_nvr, build_info in six.iteritems(file_list): + for build_nvr, build_info in six.iteritems(rpm_list): out.extend(self._rpm_push_items_from_build(erratum, build_info)) out.extend( self._module_push_items_from_build(erratum, build_nvr, build_info) @@ -217,40 +187,52 @@ def _rpm_push_items_from_build(self, erratum, build_info): return out - def __iter__(self): - # Get file list of all advisories first. + def _add_ftp_paths(self, rpm_items, ftp_paths): + # ftp_paths structure is like this: + # + # { + # "xorg-x11-server-1.20.4-16.el7_9": { + # "rpms": { + # "xorg-x11-server-1.20.4-16.el7_9.src.rpm": [ + # "/ftp/pub/redhat/linux/enterprise/7Client/en/os/SRPMS/", + # "/ftp/pub/redhat/linux/enterprise/7ComputeNode/en/os/SRPMS/", + # "/ftp/pub/redhat/linux/enterprise/7Server/en/os/SRPMS/", + # "/ftp/pub/redhat/linux/enterprise/7Workstation/en/os/SRPMS/", + # ] + # }, + # "sig_key": "fd431d51", + # } + # } + # + # We only care about the rpm => ftp path mapping, which should be added onto + # our existing rpm push items if any exist. # - # We get the file list first because it contains koji build NVRs, - # so by getting these first we can issue koji build queries and advisory - # metadata queries concurrently. - file_list_fs = [ - self._executor.submit(self._get_advisory_file_list, advisory_id) - for advisory_id in self._advisory_ids - ] - - # Then get metadata. - metadata_fs = [ - self._executor.submit(self._get_advisory_metadata, advisory_id) - for advisory_id in self._advisory_ids - ] - - # Zip up the metadata and file list for each advisory, since both are - # needed together in order to make push items. - advisory_fs = [ - f_zip(metadata_f, file_list_f) - for (metadata_f, file_list_f) in zip(metadata_fs, file_list_fs) - ] + rpm_to_paths = {} + for rpm_map in ftp_paths.values(): + for (rpm_name, paths) in (rpm_map.get("rpms") or {}).items(): + rpm_to_paths[rpm_name] = paths + + out = [] + for item in rpm_items: + paths = rpm_to_paths.get(item.name) or [] + item = attr.evolve(item, dest=item.dest + paths) + out.append(item) + + return out + + def __iter__(self): + # Get raw ET responses for all errata. + raw_fs = [self._client.get_raw_f(id) for id in self._advisory_ids] # Convert them to lists of push items - advisory_push_items_fs = [ - f_map(f, self._push_items_from_raw) for f in advisory_fs - ] + push_items_fs = [] + for f in futures.as_completed(raw_fs, timeout=self._timeout): + push_items_fs.append( + self._executor.submit(self._push_items_from_raw, f.result()) + ) - completed_fs = futures.as_completed( - advisory_push_items_fs, timeout=self._timeout - ) + completed_fs = futures.as_completed(push_items_fs, timeout=self._timeout) for f in completed_fs: - # If an exception occurred, this is where it will be raised. for pushitem in f.result(): yield pushitem diff --git a/tests/errata/conftest.py b/tests/errata/conftest.py index b6950414..c20bbee7 100644 --- a/tests/errata/conftest.py +++ b/tests/errata/conftest.py @@ -8,7 +8,9 @@ @fixture def fake_errata_tool(): controller = FakeErrataToolController() - with patch("pushsource._impl.backend.errata_source.ServerProxy") as mock_proxy: + with patch( + "pushsource._impl.backend.errata_source.errata_client.xmlrpc_client.ServerProxy" + ) as mock_proxy: mock_proxy.side_effect = controller.proxy yield controller diff --git a/tests/errata/data/RHEA-2020:0346.yaml b/tests/errata/data/RHEA-2020:0346.yaml index 7d29e08f..2b32d7e4 100644 --- a/tests/errata/data/RHEA-2020:0346.yaml +++ b/tests/errata/data/RHEA-2020:0346.yaml @@ -1,4 +1,5 @@ advisory_id: RHEA-2020:0346 + cdn_file_list: postgresql-12-8010120191120141335.e4e244f9: checksums: @@ -523,6 +524,7 @@ cdn_file_list: postgresql-upgrade-devel-debuginfo-12.1-2.module+el8.1.1+4794+c82b6e09.x86_64.rpm: - rhel-8-for-x86_64-appstream-debug-rpms__8 sig_key: fd431d51 + cdn_metadata: description: 'This enhancement update adds the postgresql:12 module stream to Red Hat Enterprise Linux 8. (BZ#1721822) @@ -2235,3 +2237,16 @@ cdn_metadata: type: enhancement updated: 2020-02-04 11:39:30 UTC version: '4' + +ftp_paths: + postgresql-12-8010120191120141335.e4e244f9: + sig_key: fd431d51 + rpms: + pgaudit-1.4.0-4.module+el8.1.1+4794+c82b6e09.src.rpm: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/ + postgresql-12.1-2.module+el8.1.1+4794+c82b6e09.src.rpm: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/ + postgres-decoderbufs-0.10.0-2.module+el8.1.1+4794+c82b6e09.src.rpm: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/ + modules: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/modules/ diff --git a/tests/errata/fake_errata_tool.py b/tests/errata/fake_errata_tool.py index b6d67f52..c0136029 100644 --- a/tests/errata/fake_errata_tool.py +++ b/tests/errata/fake_errata_tool.py @@ -42,10 +42,13 @@ def _get_data(self, advisory_id, key): out = self._ctrl._data[advisory_id] if out is None: raise Fault(100, "No such advisory: %s" % advisory_id) - return out[key] + return out.get(key, {}) def get_advisory_cdn_file_list(self, advisory_id): return self._get_data(advisory_id, "cdn_file_list") def get_advisory_cdn_metadata(self, advisory_id): return self._get_data(advisory_id, "cdn_metadata") + + def get_ftp_paths(self, advisory_id): + return self._get_data(advisory_id, "ftp_paths") diff --git a/tests/errata/test_errata_modules.py b/tests/errata/test_errata_modules.py index 363fa20e..70ae3bd8 100644 --- a/tests/errata/test_errata_modules.py +++ b/tests/errata/test_errata_modules.py @@ -199,6 +199,44 @@ def test_errata_modules_via_koji(fake_errata_tool, fake_koji, koji_dir): found_rpm_names = [item.name for item in rpm_items] assert sorted(found_rpm_names) == sorted(rpm_filenames) + # FTP paths should also be reflected in RPM dests; we'll just check + # src RPMs since those are the only ones with FTP paths + src_rpms = sorted( + [(i.name, sorted(i.dest)) for i in rpm_items if i.name.endswith(".src.rpm")] + ) + assert src_rpms == [ + ( + "pgaudit-1.4.0-4.module+el8.1.1+4794+c82b6e09.src.rpm", + [ + "/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/", + "rhel-8-for-aarch64-appstream-source-rpms__8", + "rhel-8-for-ppc64le-appstream-source-rpms__8", + "rhel-8-for-s390x-appstream-source-rpms__8", + "rhel-8-for-x86_64-appstream-source-rpms__8", + ], + ), + ( + "postgres-decoderbufs-0.10.0-2.module+el8.1.1+4794+c82b6e09.src.rpm", + [ + "/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/", + "rhel-8-for-aarch64-appstream-source-rpms__8", + "rhel-8-for-ppc64le-appstream-source-rpms__8", + "rhel-8-for-s390x-appstream-source-rpms__8", + "rhel-8-for-x86_64-appstream-source-rpms__8", + ], + ), + ( + "postgresql-12.1-2.module+el8.1.1+4794+c82b6e09.src.rpm", + [ + "/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/", + "rhel-8-for-aarch64-appstream-source-rpms__8", + "rhel-8-for-ppc64le-appstream-source-rpms__8", + "rhel-8-for-s390x-appstream-source-rpms__8", + "rhel-8-for-x86_64-appstream-source-rpms__8", + ], + ), + ] + # It should have found the modulemd files assert sorted(modulemd_items, key=lambda item: item.src) == [ ModuleMdPushItem(