/
utils.py
252 lines (202 loc) · 7.79 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# coding=utf-8
"""Utilities for tests for the rpm plugin."""
import os
from functools import partial
import requests
from io import StringIO
from unittest import SkipTest
from time import sleep
from tempfile import NamedTemporaryFile
from pulp_smash import api, cli, selectors
from pulp_smash.pulp3.utils import (
gen_remote,
gen_repo,
get_content,
require_pulp_3,
require_pulp_plugins,
sync,
)
from pulp_rpm.tests.functional.constants import (
RPM_CONTENT_PATH,
RPM_COPY_PATH,
RPM_PACKAGE_CONTENT_NAME,
RPM_PUBLICATION_PATH,
RPM_REMOTE_PATH,
RPM_REPO_PATH,
RPM_SIGNED_FIXTURE_URL,
RPM_SIGNED_URL,
RPM_UNSIGNED_FIXTURE_URL,
)
from pulpcore.client.pulpcore import (
ApiClient as CoreApiClient,
ArtifactsApi,
Configuration,
TasksApi,
)
from pulpcore.client.pulp_rpm import ApiClient as RpmApiClient
configuration = Configuration()
configuration.username = "admin"
configuration.password = "password"
configuration.safe_chars_for_path_param = "/"
def set_up_module():
"""Skip tests Pulp 3 isn't under test or if pulp_rpm isn't installed."""
require_pulp_3(SkipTest)
require_pulp_plugins({'pulp_rpm'}, SkipTest)
def gen_rpm_client():
"""Return an OBJECT for rpm client."""
return RpmApiClient(configuration)
def gen_rpm_remote(url=None, **kwargs):
"""Return a semi-random dict for use in creating a RPM remote.
:param url: The URL of an external content source.
"""
if url is None:
url = RPM_UNSIGNED_FIXTURE_URL
return gen_remote(url, **kwargs)
def get_rpm_package_paths(repo):
"""Return the relative path of content units present in a RPM repository.
:param repo: A dict of information about the repository.
:returns: A list with the paths of units present in a given repository.
"""
return [
content_unit['location_href']
for content_unit in get_content(repo)[RPM_PACKAGE_CONTENT_NAME]
if 'location_href' in content_unit
]
def populate_pulp(cfg, url=RPM_SIGNED_FIXTURE_URL):
"""Add RPM contents to Pulp.
:param pulp_smash.config.PulpSmashConfig: Information about a Pulp
application.
:param url: The RPM repository URL. Defaults to
:data:`pulp_smash.constants.RPM_UNSIGNED_FIXTURE_URL`
:returns: A list of dicts, where each dict describes one RPM content in
Pulp.
"""
client = api.Client(cfg, api.json_handler)
remote = {}
repo = {}
try:
remote.update(client.post(RPM_REMOTE_PATH, gen_rpm_remote(url)))
repo.update(client.post(RPM_REPO_PATH, gen_repo()))
sync(cfg, remote, repo)
finally:
if remote:
client.delete(remote['pulp_href'])
if repo:
client.delete(repo['pulp_href'])
return client.get(RPM_CONTENT_PATH)['results']
def rpm_copy(cfg, source_repo, dest_repo, criteria=None, content=None, recursive=False):
"""Sync a repository.
:param pulp_smash.config.PulpSmashConfig cfg: Information about the Pulp
host.
:param remote: A dict of information about the remote of the repository
to be synced.
:param repo: A dict of information about the repository.
:param kwargs: Keyword arguments to be merged in to the request data.
:returns: The server's response. A dict of information about the just
created sync.
"""
client = api.Client(cfg)
data = {
'source_repo': source_repo['pulp_href'],
'dest_repo': dest_repo['pulp_href'],
'dependency_solving': recursive,
}
if criteria:
data['criteria'] = criteria
if content:
data['content'] = content
return client.post(RPM_COPY_PATH, data)
def gen_yum_config_file(cfg, repositoryid, baseurl, name, **kwargs):
"""Generate a yum configuration file and write it to ``/etc/yum.repos.d/``.
Generate a yum configuration file containing a single repository section,
and write it to ``/etc/yum.repos.d/{repositoryid}.repo``.
:param cfg: The system on which to create
a yum configuration file.
:param repositoryid: The section's ``repositoryid``. Used when naming the
configuration file and populating the brackets at the head of the file.
For details, see yum.conf(5).
:param baseurl: The required option ``baseurl`` specifying the url of repo.
For details, see yum.conf(5)
:param name: The required option ``name`` specifying the name of repo.
For details, see yum.conf(5).
:param kwargs: Section options. Each kwarg corresponds to one option. For
details, see yum.conf(5).
:returns: The path to the yum configuration file.
"""
# required repo options
kwargs.setdefault('name', name)
kwargs.setdefault('baseurl', baseurl)
# assume some common used defaults
kwargs.setdefault('enabled', 1)
kwargs.setdefault('gpgcheck', 0)
kwargs.setdefault('metadata_expire', 0) # force metadata load every time
# Check if the settings specifies a content host role else assume ``api``
try:
content_host = cfg.get_hosts('content')[0].roles['content']
except IndexError:
content_host = cfg.get_hosts('api')[0].roles['api']
# if sslverify is not provided in kwargs it is inferred from cfg
kwargs.setdefault(
'sslverify', content_host.get('verify') and 'yes' or 'no'
)
path = os.path.join('/etc/yum.repos.d/', repositoryid + '.repo')
with StringIO() as section:
section.write('[{}]\n'.format(repositoryid))
for key, value in kwargs.items():
section.write('{} = {}\n'.format(key, value))
# machine.session is used here to keep SSH session open
cli.Client(cfg).machine.session().run(
'echo "{}" | {}tee {} > /dev/null'.format(
section.getvalue(),
'' if cli.is_root(cfg) else 'sudo ',
path
)
)
return path
def publish(cfg, repo, version_href=None):
"""Publish a repository.
:param pulp_smash.config.PulpSmashConfig cfg: Information about the Pulp
host.
:param repo: A dict of information about the repository.
:param version_href: A href for the repo version to be published.
:returns: A publication. A dict of information about the just created
publication.
"""
if version_href:
body = {"repository_version": version_href}
else:
body = {"repository": repo["pulp_href"]}
client = api.Client(cfg, api.json_handler)
call_report = client.post(RPM_PUBLICATION_PATH, body)
tasks = tuple(api.poll_spawned_tasks(cfg, call_report))
return client.get(tasks[-1]["created_resources"][0])
skip_if = partial(selectors.skip_if, exc=SkipTest)
"""The ``@skip_if`` decorator, customized for unittest.
:func:`pulp_smash.selectors.skip_if` is test runner agnostic. This function is
identical, except that ``exc`` has been set to ``unittest.SkipTest``.
"""
core_client = CoreApiClient(configuration)
tasks = TasksApi(core_client)
def gen_artifact(url=RPM_SIGNED_URL):
"""Creates an artifact."""
response = requests.get(url)
with NamedTemporaryFile() as temp_file:
temp_file.write(response.content)
artifact = ArtifactsApi(core_client).create(file=temp_file.name)
return artifact.to_dict()
def monitor_task(task_href):
"""Polls the Task API until the task is in a completed state.
Prints the task details and a success or failure message. Exits on failure.
Args:
task_href(str): The href of the task to monitor
Returns:
list[str]: List of hrefs that identify resource created by the task
"""
completed = ["completed", "failed", "canceled"]
task = tasks.read(task_href)
while task.state not in completed:
sleep(2)
task = tasks.read(task_href)
if task.state == "completed":
return task.created_resources
return task.to_dict()