Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
now also validated.
- `ModuleMdPushItem.name` now uses the NSVCA of a module rather than the filename of
a loaded modulemd file, when this metadata is available.
- `errata` source now includes FTP paths in the `dest` field of generated push items,
where applicable.

## [2.6.0] - 2021-05-27

Expand Down
1 change: 1 addition & 0 deletions src/pushsource/_impl/backend/errata_source/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .errata_source import ErrataSource
56 changes: 56 additions & 0 deletions src/pushsource/_impl/backend/errata_source/errata_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
import threading
from functools import partial

from six.moves import xmlrpc_client
from more_executors import Executors
from more_executors.futures import f_zip, f_map

from ...compat_attr import attr

LOG = logging.getLogger("pushsource.errata_client")


@attr.s()
class ErrataRaw(object):
# Helper to collect raw responses of all ET APIs for a single advisory
advisory_cdn_metadata = attr.ib(type=dict)
advisory_cdn_file_list = attr.ib(type=dict)
ftp_paths = attr.ib(type=dict)


class ErrataClient(object):
def __init__(self, threads, url):
self._executor = Executors.thread_pool(max_workers=threads).with_retry()
self._url = url
self._tls = threading.local()

self._get_advisory_cdn_metadata = partial(
self._call_et, "get_advisory_cdn_metadata"
)
self._get_advisory_cdn_file_list = partial(
self._call_et, "get_advisory_cdn_file_list"
)
self._get_ftp_paths = partial(self._call_et, "get_ftp_paths")

@property
def _errata_service(self):
# XML-RPC client connected to errata_service.
# Each thread uses a separate client.
if not hasattr(self._tls, "errata_service"):
LOG.debug("Creating XML-RPC client for Errata Tool: %s", self._url)
self._tls.errata_service = xmlrpc_client.ServerProxy(self._url)
return self._tls.errata_service

def get_raw_f(self, advisory_id):
"""Returns Future[ErrataRaw] holding all ET responses for a particular advisory."""
all_responses = f_zip(
self._executor.submit(self._get_advisory_cdn_metadata, advisory_id),
self._executor.submit(self._get_advisory_cdn_file_list, advisory_id),
self._executor.submit(self._get_ftp_paths, advisory_id),
)
return f_map(all_responses, lambda tup: ErrataRaw(*tup))

def _call_et(self, method_name, advisory_id):
method = getattr(self._errata_service, method_name)
return method(advisory_id)
Original file line number Diff line number Diff line change
@@ -1,34 +1,25 @@
import os
import threading
import logging
from concurrent import futures

import six
from six.moves.urllib import parse
from six.moves.xmlrpc_client import ServerProxy
from more_executors import Executors
from more_executors.futures import f_map, f_zip

from .. import compat_attr as attr
from ..source import Source
from ..model import ErratumPushItem
from ..helpers import list_argument
from .errata_client import ErrataClient

from ... import compat_attr as attr
from ...source import Source
from ...model import ErratumPushItem
from ...helpers import list_argument

LOG = logging.getLogger("pushsource")


class ErrataSource(Source):
"""Uses an advisory from Errata Tool as the source of push items."""

def __init__(
self,
url,
errata,
target="cdn",
koji_source=None,
threads=4,
timeout=60 * 60 * 4,
):
def __init__(self, url, errata, koji_source=None, threads=4, timeout=60 * 60 * 4):
"""Create a new source.

Parameters:
Expand All @@ -41,13 +32,6 @@ def __init__(
If a single string is given, multiple IDs may be
comma-separated.

target (str)
The target type used when querying Errata Tool.
The target type may influence the content produced,
depending on the Errata Tool settings.

Valid values include at least: "cdn", "rhn", "ftp".

koji_source (str)
URL of a koji source associated with this Errata Tool
instance.
Expand All @@ -62,16 +46,18 @@ def __init__(
"""
self._url = url
self._errata = list_argument(errata)
self._executor = Executors.thread_pool(max_workers=threads).with_retry()
self._client = ErrataClient(threads=threads, url=self._errata_service_url)

# This executor doesn't use retry because koji & ET executors already do that.
self._executor = Executors.thread_pool(max_workers=threads)

# We set aside a separate thread pool for koji so that there are separate
# queues for ET and koji calls, yet we avoid creating a new thread pool for
# each koji source.
self._koji_executor = Executors.thread_pool(max_workers=threads).with_retry()
self._koji_cache = {}
self._koji_source_url = koji_source
self._target = target
self._timeout = timeout
self._tls = threading.local()

@property
def _errata_service_url(self):
Expand All @@ -98,49 +84,33 @@ def _koji_source(self, **kwargs):
**kwargs
)

@property
def _errata_service(self):
# XML-RPC client connected to errata_service.
# Each thread uses a separate client.
if not hasattr(self._tls, "errata_service"):
url = self._errata_service_url
LOG.debug("Creating XML-RPC client for Errata Tool: %s", url)
self._tls.errata_service = ServerProxy(url)
return self._tls.errata_service

@property
def _advisory_ids(self):
# TODO: other cases (comma-separated; plain string)
return self._errata

def _get_advisory_metadata(self, advisory_id):
# TODO: use CDN API even for RHN?
return self._errata_service.get_advisory_cdn_metadata(advisory_id)

def _get_advisory_file_list(self, advisory_id):
# TODO: look at the target hint
return self._errata_service.get_advisory_cdn_file_list(advisory_id)

def _push_items_from_raw(self, metadata_and_file_list):
(metadata, file_list) = metadata_and_file_list
def _push_items_from_raw(self, raw):
erratum = ErratumPushItem._from_data(raw.advisory_cdn_metadata)

erratum = ErratumPushItem._from_data(metadata)
files = self._push_items_from_files(erratum, file_list)
rpms = self._push_items_from_rpms(erratum, raw.advisory_cdn_file_list)

# The erratum should go to all the same destinations as the files.
# The erratum should go to all the same destinations as the rpms,
# before FTP paths are added.
erratum_dest = set(erratum.dest or [])
for file in files:
for dest in file.dest:
for rpm in rpms:
for dest in rpm.dest:
erratum_dest.add(dest)

erratum = attr.evolve(erratum, dest=sorted(erratum_dest))

return [erratum] + files
# Enrich RPM push items with their FTP paths if any.
rpms = self._add_ftp_paths(rpms, raw.ftp_paths)

return [erratum] + rpms

def _push_items_from_files(self, erratum, file_list):
def _push_items_from_rpms(self, erratum, rpm_list):
out = []

for build_nvr, build_info in six.iteritems(file_list):
for build_nvr, build_info in six.iteritems(rpm_list):
out.extend(self._rpm_push_items_from_build(erratum, build_info))
out.extend(
self._module_push_items_from_build(erratum, build_nvr, build_info)
Expand Down Expand Up @@ -217,40 +187,52 @@ def _rpm_push_items_from_build(self, erratum, build_info):

return out

def __iter__(self):
# Get file list of all advisories first.
def _add_ftp_paths(self, rpm_items, ftp_paths):
# ftp_paths structure is like this:
#
# {
# "xorg-x11-server-1.20.4-16.el7_9": {
# "rpms": {
# "xorg-x11-server-1.20.4-16.el7_9.src.rpm": [
# "/ftp/pub/redhat/linux/enterprise/7Client/en/os/SRPMS/",
# "/ftp/pub/redhat/linux/enterprise/7ComputeNode/en/os/SRPMS/",
# "/ftp/pub/redhat/linux/enterprise/7Server/en/os/SRPMS/",
# "/ftp/pub/redhat/linux/enterprise/7Workstation/en/os/SRPMS/",
# ]
# },
# "sig_key": "fd431d51",
# }
# }
#
# We only care about the rpm => ftp path mapping, which should be added onto
# our existing rpm push items if any exist.
#
# We get the file list first because it contains koji build NVRs,
# so by getting these first we can issue koji build queries and advisory
# metadata queries concurrently.
file_list_fs = [
self._executor.submit(self._get_advisory_file_list, advisory_id)
for advisory_id in self._advisory_ids
]

# Then get metadata.
metadata_fs = [
self._executor.submit(self._get_advisory_metadata, advisory_id)
for advisory_id in self._advisory_ids
]

# Zip up the metadata and file list for each advisory, since both are
# needed together in order to make push items.
advisory_fs = [
f_zip(metadata_f, file_list_f)
for (metadata_f, file_list_f) in zip(metadata_fs, file_list_fs)
]
rpm_to_paths = {}
for rpm_map in ftp_paths.values():
for (rpm_name, paths) in (rpm_map.get("rpms") or {}).items():
rpm_to_paths[rpm_name] = paths

out = []
for item in rpm_items:
paths = rpm_to_paths.get(item.name) or []
item = attr.evolve(item, dest=item.dest + paths)
out.append(item)

return out

def __iter__(self):
# Get raw ET responses for all errata.
raw_fs = [self._client.get_raw_f(id) for id in self._advisory_ids]

# Convert them to lists of push items
advisory_push_items_fs = [
f_map(f, self._push_items_from_raw) for f in advisory_fs
]
push_items_fs = []
for f in futures.as_completed(raw_fs, timeout=self._timeout):
push_items_fs.append(
self._executor.submit(self._push_items_from_raw, f.result())
)

completed_fs = futures.as_completed(
advisory_push_items_fs, timeout=self._timeout
)
completed_fs = futures.as_completed(push_items_fs, timeout=self._timeout)
for f in completed_fs:
# If an exception occurred, this is where it will be raised.
for pushitem in f.result():
yield pushitem

Expand Down
4 changes: 3 additions & 1 deletion tests/errata/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
@fixture
def fake_errata_tool():
controller = FakeErrataToolController()
with patch("pushsource._impl.backend.errata_source.ServerProxy") as mock_proxy:
with patch(
"pushsource._impl.backend.errata_source.errata_client.xmlrpc_client.ServerProxy"
) as mock_proxy:
mock_proxy.side_effect = controller.proxy
yield controller

Expand Down
15 changes: 15 additions & 0 deletions tests/errata/data/RHEA-2020:0346.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
advisory_id: RHEA-2020:0346

cdn_file_list:
postgresql-12-8010120191120141335.e4e244f9:
checksums:
Expand Down Expand Up @@ -523,6 +524,7 @@ cdn_file_list:
postgresql-upgrade-devel-debuginfo-12.1-2.module+el8.1.1+4794+c82b6e09.x86_64.rpm:
- rhel-8-for-x86_64-appstream-debug-rpms__8
sig_key: fd431d51

cdn_metadata:
description: 'This enhancement update adds the postgresql:12 module stream to Red
Hat Enterprise Linux 8. (BZ#1721822)
Expand Down Expand Up @@ -2235,3 +2237,16 @@ cdn_metadata:
type: enhancement
updated: 2020-02-04 11:39:30 UTC
version: '4'

ftp_paths:
postgresql-12-8010120191120141335.e4e244f9:
sig_key: fd431d51
rpms:
pgaudit-1.4.0-4.module+el8.1.1+4794+c82b6e09.src.rpm:
- /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/
postgresql-12.1-2.module+el8.1.1+4794+c82b6e09.src.rpm:
- /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/
postgres-decoderbufs-0.10.0-2.module+el8.1.1+4794+c82b6e09.src.rpm:
- /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/
modules:
- /ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/modules/
5 changes: 4 additions & 1 deletion tests/errata/fake_errata_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ def _get_data(self, advisory_id, key):
out = self._ctrl._data[advisory_id]
if out is None:
raise Fault(100, "No such advisory: %s" % advisory_id)
return out[key]
return out.get(key, {})

def get_advisory_cdn_file_list(self, advisory_id):
return self._get_data(advisory_id, "cdn_file_list")

def get_advisory_cdn_metadata(self, advisory_id):
return self._get_data(advisory_id, "cdn_metadata")

def get_ftp_paths(self, advisory_id):
return self._get_data(advisory_id, "ftp_paths")
38 changes: 38 additions & 0 deletions tests/errata/test_errata_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,44 @@ def test_errata_modules_via_koji(fake_errata_tool, fake_koji, koji_dir):
found_rpm_names = [item.name for item in rpm_items]
assert sorted(found_rpm_names) == sorted(rpm_filenames)

# FTP paths should also be reflected in RPM dests; we'll just check
# src RPMs since those are the only ones with FTP paths
src_rpms = sorted(
[(i.name, sorted(i.dest)) for i in rpm_items if i.name.endswith(".src.rpm")]
)
assert src_rpms == [
(
"pgaudit-1.4.0-4.module+el8.1.1+4794+c82b6e09.src.rpm",
[
"/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/",
"rhel-8-for-aarch64-appstream-source-rpms__8",
"rhel-8-for-ppc64le-appstream-source-rpms__8",
"rhel-8-for-s390x-appstream-source-rpms__8",
"rhel-8-for-x86_64-appstream-source-rpms__8",
],
),
(
"postgres-decoderbufs-0.10.0-2.module+el8.1.1+4794+c82b6e09.src.rpm",
[
"/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/",
"rhel-8-for-aarch64-appstream-source-rpms__8",
"rhel-8-for-ppc64le-appstream-source-rpms__8",
"rhel-8-for-s390x-appstream-source-rpms__8",
"rhel-8-for-x86_64-appstream-source-rpms__8",
],
),
(
"postgresql-12.1-2.module+el8.1.1+4794+c82b6e09.src.rpm",
[
"/ftp/pub/redhat/linux/enterprise/AppStream-8.1.1.MAIN/en/os/SRPMS/",
"rhel-8-for-aarch64-appstream-source-rpms__8",
"rhel-8-for-ppc64le-appstream-source-rpms__8",
"rhel-8-for-s390x-appstream-source-rpms__8",
"rhel-8-for-x86_64-appstream-source-rpms__8",
],
),
]

# It should have found the modulemd files
assert sorted(modulemd_items, key=lambda item: item.src) == [
ModuleMdPushItem(
Expand Down