-
-
Notifications
You must be signed in to change notification settings - Fork 352
/
facts.py
351 lines (280 loc) · 9.7 KB
/
facts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""
The pyinfra facts API. Facts enable pyinfra to collect remote server state which
is used to "diff" with the desired state, producing the final commands required
for a deploy.
Note that the facts API does *not* use the global currently in context host so
it's possible to call facts on hosts out of context (ie give me the IP of this
other host B while I operate on this host A).
"""
from __future__ import annotations
import re
from inspect import getcallargs
from socket import error as socket_error, timeout as timeout_error
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generic,
Iterable,
Optional,
Type,
TypeVar,
Union,
cast,
)
import click
import gevent
from paramiko import SSHException
from pyinfra import logger
from pyinfra.api import StringCommand
from pyinfra.api.arguments import pop_global_arguments
from pyinfra.api.util import (
get_kwargs_str,
log_error_or_warning,
log_host_command_error,
make_hash,
print_host_combined_output,
)
from pyinfra.connectors.util import CommandOutput
from pyinfra.context import ctx_host, ctx_state
from pyinfra.progress import progress_spinner
from .arguments import CONNECTOR_ARGUMENT_KEYS
if TYPE_CHECKING:
from pyinfra.api.host import Host
from pyinfra.api.state import State
SUDO_REGEX = r"^sudo: unknown user"
SU_REGEXES = (
r"^su: user .+ does not exist",
r"^su: unknown login",
)
T = TypeVar("T")
class FactBase(Generic[T]):
name: str
abstract: bool = True
shell_executable: Optional[str] = None
requires_command: Optional[str] = None
command: Union[str, Callable]
def __init_subclass__(cls) -> None:
super().__init_subclass__()
module_name = cls.__module__.replace("pyinfra.facts.", "")
cls.name = f"{module_name}.{cls.__name__}"
@staticmethod
def default() -> T:
"""
Set the default attribute to be a type (eg list/dict).
"""
return cast(T, None)
def process(self, output: Iterable[str]) -> T:
# NOTE: TypeVar does not support a default, so we have to cast this str -> T
return cast(T, "\n".join(output))
def process_pipeline(self, args, output):
return {arg: self.process([output[i]]) for i, arg in enumerate(args)}
class ShortFactBase(Generic[T]):
name: str
fact: Type[FactBase]
def __init_subclass__(cls) -> None:
super().__init_subclass__()
module_name = cls.__module__.replace("pyinfra.facts.", "")
cls.name = f"{module_name}.{cls.__name__}"
@staticmethod
def process_data(data):
return data
def get_short_facts(state: "State", host: "Host", short_fact, **kwargs):
fact_data = get_fact(state, host, short_fact.fact, **kwargs)
return short_fact().process_data(fact_data)
def _make_command(command_attribute, host_args):
if callable(command_attribute):
host_args.pop("self", None)
return command_attribute(**host_args)
return command_attribute
def _get_executor_kwargs(
state: "State",
host: "Host",
override_kwargs: Optional[dict[str, Any]] = None,
override_kwarg_keys: Optional[list[str]] = None,
):
if override_kwargs is None:
override_kwargs = {}
if override_kwarg_keys is None:
override_kwarg_keys = []
# Use the current operation global kwargs, or generate defaults
global_kwargs = host.current_op_global_arguments
if not global_kwargs:
global_kwargs, _ = pop_global_arguments({}, state, host)
# Apply any current op kwargs that *weren't* found in the overrides
override_kwargs.update(
{key: value for key, value in global_kwargs.items() if key not in override_kwarg_keys},
)
return {key: value for key, value in override_kwargs.items() if key in CONNECTOR_ARGUMENT_KEYS}
def _handle_fact_kwargs(state, host, cls, args, kwargs):
args = args or []
kwargs = kwargs or {}
# Start with a (shallow) copy of current operation kwargs if any
ctx_kwargs = (host.current_op_global_arguments or {}).copy()
# Update with the input kwargs (overrides)
ctx_kwargs.update(kwargs)
# Pop executor kwargs, pass remaining
global_kwargs, _ = pop_global_arguments(
ctx_kwargs,
state=state,
host=host,
)
fact_kwargs = {key: value for key, value in kwargs.items() if key not in global_kwargs}
if args or fact_kwargs:
# Merges args & kwargs into a single kwargs dictionary
fact_kwargs = getcallargs(cls().command, *args, **fact_kwargs)
return fact_kwargs, global_kwargs
def get_facts(state: "State", *args, **kwargs):
def get_fact_with_context(state, host, *args, **kwargs):
with ctx_state.use(state):
with ctx_host.use(host):
return get_fact(state, host, *args, **kwargs)
greenlet_to_host = {
state.pool.spawn(get_fact_with_context, state, host, *args, **kwargs): host
for host in state.inventory.iter_active_hosts()
}
results = {}
with progress_spinner(greenlet_to_host.values()) as progress:
for greenlet in gevent.iwait(greenlet_to_host.keys()):
host = greenlet_to_host[greenlet]
results[host] = greenlet.get()
progress(host)
return results
def get_fact(
state: "State",
host: "Host",
cls: type[FactBase],
args: Optional[Any] = None,
kwargs: Optional[Any] = None,
ensure_hosts: Optional[Any] = None,
apply_failed_hosts: bool = True,
) -> Any:
if issubclass(cls, ShortFactBase):
return get_short_facts(
state,
host,
cls,
args=args,
kwargs=kwargs,
ensure_hosts=ensure_hosts,
apply_failed_hosts=apply_failed_hosts,
)
return _get_fact(
state,
host,
cls,
args,
kwargs,
ensure_hosts,
apply_failed_hosts,
)
def _get_fact(
state: "State",
host: "Host",
cls: type[FactBase],
args: Optional[list] = None,
kwargs: Optional[dict] = None,
ensure_hosts: Optional[Any] = None,
apply_failed_hosts: bool = True,
) -> Any:
fact = cls()
name = fact.name
fact_kwargs, global_kwargs = _handle_fact_kwargs(state, host, cls, args, kwargs)
kwargs_str = get_kwargs_str(fact_kwargs)
logger.debug(
"Getting fact: %s (%s) (ensure_hosts: %r)",
name,
kwargs_str,
ensure_hosts,
)
if not host.connected:
host.connect(
reason=f"to load fact: {name} ({kwargs_str})",
raise_exceptions=True,
)
# Facts can override the shell (winrm powershell vs cmd support)
if fact.shell_executable:
global_kwargs["_shell_executable"] = fact.shell_executable
command = _make_command(fact.command, fact_kwargs)
requires_command = _make_command(fact.requires_command, fact_kwargs)
if requires_command:
command = StringCommand(
# Command doesn't exist, return 0 *or* run & return fact command
"!",
"command",
"-v",
requires_command,
">/dev/null",
"||",
command,
)
status = False
output = CommandOutput([])
executor_kwargs = {
key: value for key, value in global_kwargs.items() if key in CONNECTOR_ARGUMENT_KEYS
}
try:
status, output = host.run_shell_command(
command,
print_output=state.print_fact_output,
print_input=state.print_fact_input,
**executor_kwargs,
)
except (timeout_error, socket_error, SSHException) as e:
log_host_command_error(
host,
e,
timeout=global_kwargs["_timeout"],
)
stdout_lines, stderr_lines = output.stdout_lines, output.stderr_lines
data = fact.default()
if status:
if stdout_lines:
data = fact.process(stdout_lines)
elif stderr_lines:
# If we have error output and that error is sudo or su stating the user
# does not exist, do not fail but instead return the default fact value.
# This allows for users that don't currently but may be created during
# other operations.
first_line = stderr_lines[0]
if executor_kwargs["_sudo_user"] and re.match(SUDO_REGEX, first_line):
status = True
if executor_kwargs["_su_user"] and any(re.match(regex, first_line) for regex in SU_REGEXES):
status = True
if status:
log_message = "{0}{1}".format(
host.print_prefix,
"Loaded fact {0}{1}".format(
click.style(name, bold=True),
f" ({get_kwargs_str(kwargs)})" if kwargs else "",
),
)
if state.print_fact_info:
logger.info(log_message)
else:
logger.debug(log_message)
else:
if not state.print_fact_output:
print_host_combined_output(host, output)
log_error_or_warning(
host,
global_kwargs["_ignore_errors"],
description=("could not load fact: {0} {1}").format(name, get_kwargs_str(fact_kwargs)),
)
# Check we've not failed
if apply_failed_hosts and not status and not global_kwargs["_ignore_errors"]:
state.fail_hosts({host})
return data
def _get_fact_hash(state: "State", host: "Host", cls, args, kwargs):
if issubclass(cls, ShortFactBase):
cls = cls.fact
fact_kwargs, executor_kwargs = _handle_fact_kwargs(state, host, cls, args, kwargs)
return make_hash((cls, fact_kwargs, executor_kwargs))
def get_host_fact(
state: "State",
host: "Host",
cls,
args: Optional[Iterable] = None,
kwargs: Optional[dict] = None,
) -> Any:
return get_fact(state, host, cls, args=args, kwargs=kwargs)