From 552c53ee386026c5280d13e37c93ea95c4bad269 Mon Sep 17 00:00:00 2001 From: Rohan McGovern Date: Wed, 9 Jun 2021 16:06:07 +1000 Subject: [PATCH] errata: support FTP paths for RPMs Integrate support for get_ftp_paths API into errata backend. This can now result in the 'dest' for one RPM push item having a mixture of rhsm-pulp repo IDs and FTP paths. This will allow alt-src pushes to work on top of errata backend. Other tweaks include: - drop the 'target' argument to errata backend (which was not used). It seems we ought not to need this. For now the design now is that the source will give you back all kinds of destinations (and if you only want to deal with certain types, e.g. you don't want the ftp paths, you need to filter them yourself). It's possible some optional filtering arguments will be added here later. - separate the low-level code which calls ET methods into a new errata_client module Note: the backend remains marked as a technical preview with no stability guarantee. --- CHANGELOG.md | 2 + .../_impl/backend/errata_source/__init__.py | 1 + .../backend/errata_source/errata_client.py | 56 +++++++ .../{ => errata_source}/errata_source.py | 150 ++++++++---------- tests/errata/conftest.py | 4 +- tests/errata/data/RHEA-2020:0346.yaml | 15 ++ tests/errata/fake_errata_tool.py | 5 +- tests/errata/test_errata_modules.py | 38 +++++ 8 files changed, 185 insertions(+), 86 deletions(-) create mode 100644 src/pushsource/_impl/backend/errata_source/__init__.py create mode 100644 src/pushsource/_impl/backend/errata_source/errata_client.py rename src/pushsource/_impl/backend/{ => errata_source}/errata_source.py (66%) diff --git a/CHANGELOG.md b/CHANGELOG.md index cee00b5d..ec2f7512 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 now also validated. - `ModuleMdPushItem.name` now uses the NSVCA of a module rather than the filename of a loaded modulemd file, when this metadata is available. +- `errata` source now includes FTP paths in the `dest` field of generated push items, + where applicable. ## [2.6.0] - 2021-05-27 diff --git a/src/pushsource/_impl/backend/errata_source/__init__.py b/src/pushsource/_impl/backend/errata_source/__init__.py new file mode 100644 index 00000000..1785f29b --- /dev/null +++ b/src/pushsource/_impl/backend/errata_source/__init__.py @@ -0,0 +1 @@ +from .errata_source import ErrataSource diff --git a/src/pushsource/_impl/backend/errata_source/errata_client.py b/src/pushsource/_impl/backend/errata_source/errata_client.py new file mode 100644 index 00000000..fc333dd1 --- /dev/null +++ b/src/pushsource/_impl/backend/errata_source/errata_client.py @@ -0,0 +1,56 @@ +import logging +import threading +from functools import partial + +from six.moves import xmlrpc_client +from more_executors import Executors +from more_executors.futures import f_zip, f_map + +from ...compat_attr import attr + +LOG = logging.getLogger("pushsource.errata_client") + + +@attr.s() +class ErrataRaw(object): + # Helper to collect raw responses of all ET APIs for a single advisory + advisory_cdn_metadata = attr.ib(type=dict) + advisory_cdn_file_list = attr.ib(type=dict) + ftp_paths = attr.ib(type=dict) + + +class ErrataClient(object): + def __init__(self, threads, url): + self._executor = Executors.thread_pool(max_workers=threads).with_retry() + self._url = url + self._tls = threading.local() + + self._get_advisory_cdn_metadata = partial( + self._call_et, "get_advisory_cdn_metadata" + ) + self._get_advisory_cdn_file_list = partial( + self._call_et, "get_advisory_cdn_file_list" + ) + self._get_ftp_paths = partial(self._call_et, "get_ftp_paths") + + @property + def _errata_service(self): + # XML-RPC client connected to errata_service. + # Each thread uses a separate client. + if not hasattr(self._tls, "errata_service"): + LOG.debug("Creating XML-RPC client for Errata Tool: %s", self._url) + self._tls.errata_service = xmlrpc_client.ServerProxy(self._url) + return self._tls.errata_service + + def get_raw_f(self, advisory_id): + """Returns Future[ErrataRaw] holding all ET responses for a particular advisory.""" + all_responses = f_zip( + self._executor.submit(self._get_advisory_cdn_metadata, advisory_id), + self._executor.submit(self._get_advisory_cdn_file_list, advisory_id), + self._executor.submit(self._get_ftp_paths, advisory_id), + ) + return f_map(all_responses, lambda tup: ErrataRaw(*tup)) + + def _call_et(self, method_name, advisory_id): + method = getattr(self._errata_service, method_name) + return method(advisory_id) diff --git a/src/pushsource/_impl/backend/errata_source.py b/src/pushsource/_impl/backend/errata_source/errata_source.py similarity index 66% rename from src/pushsource/_impl/backend/errata_source.py rename to src/pushsource/_impl/backend/errata_source/errata_source.py index 45d213b0..46a60e7b 100644 --- a/src/pushsource/_impl/backend/errata_source.py +++ b/src/pushsource/_impl/backend/errata_source/errata_source.py @@ -1,18 +1,17 @@ import os -import threading import logging from concurrent import futures import six from six.moves.urllib import parse -from six.moves.xmlrpc_client import ServerProxy from more_executors import Executors -from more_executors.futures import f_map, f_zip -from .. import compat_attr as attr -from ..source import Source -from ..model import ErratumPushItem -from ..helpers import list_argument +from .errata_client import ErrataClient + +from ... import compat_attr as attr +from ...source import Source +from ...model import ErratumPushItem +from ...helpers import list_argument LOG = logging.getLogger("pushsource") @@ -20,15 +19,7 @@ class ErrataSource(Source): """Uses an advisory from Errata Tool as the source of push items.""" - def __init__( - self, - url, - errata, - target="cdn", - koji_source=None, - threads=4, - timeout=60 * 60 * 4, - ): + def __init__(self, url, errata, koji_source=None, threads=4, timeout=60 * 60 * 4): """Create a new source. Parameters: @@ -41,13 +32,6 @@ def __init__( If a single string is given, multiple IDs may be comma-separated. - target (str) - The target type used when querying Errata Tool. - The target type may influence the content produced, - depending on the Errata Tool settings. - - Valid values include at least: "cdn", "rhn", "ftp". - koji_source (str) URL of a koji source associated with this Errata Tool instance. @@ -62,16 +46,18 @@ def __init__( """ self._url = url self._errata = list_argument(errata) - self._executor = Executors.thread_pool(max_workers=threads).with_retry() + self._client = ErrataClient(threads=threads, url=self._errata_service_url) + + # This executor doesn't use retry because koji & ET executors already do that. + self._executor = Executors.thread_pool(max_workers=threads) + # We set aside a separate thread pool for koji so that there are separate # queues for ET and koji calls, yet we avoid creating a new thread pool for # each koji source. self._koji_executor = Executors.thread_pool(max_workers=threads).with_retry() self._koji_cache = {} self._koji_source_url = koji_source - self._target = target self._timeout = timeout - self._tls = threading.local() @property def _errata_service_url(self): @@ -98,49 +84,33 @@ def _koji_source(self, **kwargs): **kwargs ) - @property - def _errata_service(self): - # XML-RPC client connected to errata_service. - # Each thread uses a separate client. - if not hasattr(self._tls, "errata_service"): - url = self._errata_service_url - LOG.debug("Creating XML-RPC client for Errata Tool: %s", url) - self._tls.errata_service = ServerProxy(url) - return self._tls.errata_service - @property def _advisory_ids(self): # TODO: other cases (comma-separated; plain string) return self._errata - def _get_advisory_metadata(self, advisory_id): - # TODO: use CDN API even for RHN? - return self._errata_service.get_advisory_cdn_metadata(advisory_id) - - def _get_advisory_file_list(self, advisory_id): - # TODO: look at the target hint - return self._errata_service.get_advisory_cdn_file_list(advisory_id) - - def _push_items_from_raw(self, metadata_and_file_list): - (metadata, file_list) = metadata_and_file_list + def _push_items_from_raw(self, raw): + erratum = ErratumPushItem._from_data(raw.advisory_cdn_metadata) - erratum = ErratumPushItem._from_data(metadata) - files = self._push_items_from_files(erratum, file_list) + rpms = self._push_items_from_rpms(erratum, raw.advisory_cdn_file_list) - # The erratum should go to all the same destinations as the files. + # The erratum should go to all the same destinations as the rpms, + # before FTP paths are added. erratum_dest = set(erratum.dest or []) - for file in files: - for dest in file.dest: + for rpm in rpms: + for dest in rpm.dest: erratum_dest.add(dest) - erratum = attr.evolve(erratum, dest=sorted(erratum_dest)) - return [erratum] + files + # Enrich RPM push items with their FTP paths if any. + rpms = self._add_ftp_paths(rpms, raw.ftp_paths) + + return [erratum] + rpms - def _push_items_from_files(self, erratum, file_list): + def _push_items_from_rpms(self, erratum, rpm_list): out = [] - for build_nvr, build_info in six.iteritems(file_list): + for build_nvr, build_info in six.iteritems(rpm_list): out.extend(self._rpm_push_items_from_build(erratum, build_info)) out.extend( self._module_push_items_from_build(erratum, build_nvr, build_info) @@ -217,40 +187,52 @@ def _rpm_push_items_from_build(self, erratum, build_info): return out - def __iter__(self): - # Get file list of all advisories first. + def _add_ftp_paths(self, rpm_items, ftp_paths): + # ftp_paths structure is like this: + # + # { + # "xorg-x11-server-1.20.4-16.el7_9": { + # "rpms": { + # "xorg-x11-server-1.20.4-16.el7_9.src.rpm": [ + # "/ftp/pub/redhat/linux/enterprise/7Client/en/os/SRPMS/", + # "/ftp/pub/redhat/linux/enterprise/7ComputeNode/en/os/SRPMS/", + # "/ftp/pub/redhat/linux/enterprise/7Server/en/os/SRPMS/", + # "/ftp/pub/redhat/linux/enterprise/7Workstation/en/os/SRPMS/", + # ] + # }, + # "sig_key": "fd431d51", + # } + # } + # + # We only care about the rpm => ftp path mapping, which should be added onto + # our existing rpm push items if any exist. # - # We get the file list first because it contains koji build NVRs, - # so by getting these first we can issue koji build queries and advisory - # metadata queries concurrently. - file_list_fs = [ - self._executor.submit(self._get_advisory_file_list, advisory_id) - for advisory_id in self._advisory_ids - ] - - # Then get metadata. - metadata_fs = [ - self._executor.submit(self._get_advisory_metadata, advisory_id) - for advisory_id in self._advisory_ids - ] - - # Zip up the metadata and file list for each advisory, since both are - # needed together in order to make push items. - advisory_fs = [ - f_zip(metadata_f, file_list_f) - for (metadata_f, file_list_f) in zip(metadata_fs, file_list_fs) - ] + rpm_to_paths = {} + for rpm_map in ftp_paths.values(): + for (rpm_name, paths) in (rpm_map.get("rpms") or {}).items(): + rpm_to_paths[rpm_name] = paths + + out = [] + for item in rpm_items: + paths = rpm_to_paths.get(item.name) or [] + item = attr.evolve(item, dest=item.dest + paths) + out.append(item) + + return out + + def __iter__(self): + # Get raw ET responses for all errata. + raw_fs = [self._client.get_raw_f(id) for id in self._advisory_ids] # Convert them to lists of push items - advisory_push_items_fs = [ - f_map(f, self._push_items_from_raw) for f in advisory_fs - ] + push_items_fs = [] + for f in futures.as_completed(raw_fs, timeout=self._timeout): + push_items_fs.append( + self._executor.submit(self._push_items_from_raw, f.result()) + ) - completed_fs = futures.as_completed( - advisory_push_items_fs, timeout=self._timeout - ) + completed_fs = futures.as_completed(push_items_fs, timeout=self._timeout) for f in completed_fs: - # If an exception occurred, this is where it will be raised. for pushitem in f.result(): yield pushitem diff --git a/tests/errata/conftest.py b/tests/errata/conftest.py index b6950414..c20bbee7 100644 --- a/tests/errata/conftest.py +++ b/tests/errata/conftest.py @@ -8,7 +8,9 @@ @fixture def fake_errata_tool(): controller = FakeErrataToolController() - with patch("pushsource._impl.backend.errata_source.ServerProxy") as mock_proxy: + with patch( + "pushsource._impl.backend.errata_source.errata_client.xmlrpc_client.ServerProxy" + ) as mock_proxy: mock_proxy.side_effect = controller.proxy yield controller diff --git a/tests/errata/data/RHEA-2020:0346.yaml b/tests/errata/data/RHEA-2020:0346.yaml index 7d29e08f..2b32d7e4 100644 --- a/tests/errata/data/RHEA-2020:0346.yaml +++ b/tests/errata/data/RHEA-2020:0346.yaml @@ -1,4 +1,5 @@ advisory_id: RHEA-2020:0346 + cdn_file_list: postgresql-12-8010120191120141335.e4e244f9: checksums: @@ -523,6 +524,7 @@ cdn_file_list: postgresql-upgrade-devel-debuginfo-12.1-2.module+el8.1.1+4794+c82b6e09.x86_64.rpm: - rhel-8-for-x86_64-appstream-debug-rpms__8 sig_key: fd431d51 + cdn_metadata: description: 'This enhancement update adds the postgresql:12 module stream to Red Hat Enterprise Linux 8. (BZ#1721822) @@ -2235,3 +2237,16 @@ cdn_metadata: type: enhancement updated: 2020-02-04 11:39:30 UTC version: '4' + +ftp_paths: + postgresql-12-8010120191120141335.e4e244f9: + sig_key: fd431d51 + rpms: + pgaudit-1.4.0-4.module+el8.1.1+4794+c82b6e09.src.rpm: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/ + postgresql-12.1-2.module+el8.1.1+4794+c82b6e09.src.rpm: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/ + postgres-decoderbufs-0.10.0-2.module+el8.1.1+4794+c82b6e09.src.rpm: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/ + modules: + - /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/modules/ diff --git a/tests/errata/fake_errata_tool.py b/tests/errata/fake_errata_tool.py index b6d67f52..c0136029 100644 --- a/tests/errata/fake_errata_tool.py +++ b/tests/errata/fake_errata_tool.py @@ -42,10 +42,13 @@ def _get_data(self, advisory_id, key): out = self._ctrl._data[advisory_id] if out is None: raise Fault(100, "No such advisory: %s" % advisory_id) - return out[key] + return out.get(key, {}) def get_advisory_cdn_file_list(self, advisory_id): return self._get_data(advisory_id, "cdn_file_list") def get_advisory_cdn_metadata(self, advisory_id): return self._get_data(advisory_id, "cdn_metadata") + + def get_ftp_paths(self, advisory_id): + return self._get_data(advisory_id, "ftp_paths") diff --git a/tests/errata/test_errata_modules.py b/tests/errata/test_errata_modules.py index 363fa20e..70ae3bd8 100644 --- a/tests/errata/test_errata_modules.py +++ b/tests/errata/test_errata_modules.py @@ -199,6 +199,44 @@ def test_errata_modules_via_koji(fake_errata_tool, fake_koji, koji_dir): found_rpm_names = [item.name for item in rpm_items] assert sorted(found_rpm_names) == sorted(rpm_filenames) + # FTP paths should also be reflected in RPM dests; we'll just check + # src RPMs since those are the only ones with FTP paths + src_rpms = sorted( + [(i.name, sorted(i.dest)) for i in rpm_items if i.name.endswith(".src.rpm")] + ) + assert src_rpms == [ + ( + "pgaudit-1.4.0-4.module+el8.1.1+4794+c82b6e09.src.rpm", + [ + "/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/", + "rhel-8-for-aarch64-appstream-source-rpms__8", + "rhel-8-for-ppc64le-appstream-source-rpms__8", + "rhel-8-for-s390x-appstream-source-rpms__8", + "rhel-8-for-x86_64-appstream-source-rpms__8", + ], + ), + ( + "postgres-decoderbufs-0.10.0-2.module+el8.1.1+4794+c82b6e09.src.rpm", + [ + "/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/", + "rhel-8-for-aarch64-appstream-source-rpms__8", + "rhel-8-for-ppc64le-appstream-source-rpms__8", + "rhel-8-for-s390x-appstream-source-rpms__8", + "rhel-8-for-x86_64-appstream-source-rpms__8", + ], + ), + ( + "postgresql-12.1-2.module+el8.1.1+4794+c82b6e09.src.rpm", + [ + "/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/", + "rhel-8-for-aarch64-appstream-source-rpms__8", + "rhel-8-for-ppc64le-appstream-source-rpms__8", + "rhel-8-for-s390x-appstream-source-rpms__8", + "rhel-8-for-x86_64-appstream-source-rpms__8", + ], + ), + ] + # It should have found the modulemd files assert sorted(modulemd_items, key=lambda item: item.src) == [ ModuleMdPushItem(