-
Notifications
You must be signed in to change notification settings - Fork 534
/
__init__.py
3556 lines (2910 loc) · 133 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (C) 2006 Steve Conklin <sconklin@redhat.com>
# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.
# pylint: disable=too-many-locals,too-many-branches
""" This exports methods available for use by plugins for sos """
import contextlib
import os
import glob
import re
import stat
from time import time
import logging
import fnmatch
import errno
import textwrap
from datetime import datetime
from sos.utilities import (sos_get_command_output, import_module, grep,
fileobj, tail, is_executable, TIMEOUT_DEFAULT,
path_exists, path_isdir, path_isfile, path_islink,
listdir, path_join, bold, file_is_binary,
recursive_dict_values_by_key)
from sos.archive import P_FILE, P_LINK
def regex_findall(regex, fname):
"""Return a list of all non overlapping matches in the string(s)"""
try:
with fileobj(fname) as f:
return re.findall(regex, f.read(), re.MULTILINE)
except AttributeError:
return []
def _mangle_command(command, name_max):
mangledname = re.sub(r"^/(usr/|)(bin|sbin)/", "", command)
mangledname = re.sub(r"[^\w\-\.\/]+", "_", mangledname)
mangledname = re.sub(r"/", ".", mangledname).strip(" ._-")
mangledname = mangledname[0:name_max]
return mangledname
def _node_type(st):
""" return a string indicating the type of special node represented by
the stat buffer st (block, character, fifo, socket).
"""
_types = [
(stat.S_ISBLK, "block device"),
(stat.S_ISCHR, "character device"),
(stat.S_ISFIFO, "named pipe"),
(stat.S_ISSOCK, "socket")
]
for t in _types:
if t[0](st.st_mode):
return t[1]
return ''
_certmatch = re.compile("----(?:-| )BEGIN.*?----(?:-| )END", re.DOTALL)
_cert_replace = "-----SCRUBBED"
class SoSPredicate(object):
"""A class to implement collection predicates.
A predicate gates the collection of data by an sos plugin. For any
`add_cmd_output()`, `add_copy_spec()` or `add_journal()` call, the
passed predicate will be evaulated and collection will proceed if
the result is `True`, and not otherwise.
Predicates may be used to control conditional data collection
without the need for explicit conditional blocks in plugins.
:param owner: The ``Plugin`` object creating the predicate
:type owner: ``Plugin``
:param dry_run: Is sos running in dry_run mode?
:type dry_run: ``bool``
:param kmods: Kernel module name(s) to check presence of
:type kmods: ``list``, or ``str`` of single name
:param services: Service name(s) to check if running
:type services: ``list``, or ``str`` of single name
:param packages: Package name(s) to check presence of
:type packages: ``list``, or ``str`` of single name
:param cmd_outputs: Command to run, with output string to check
:type cmd_outputs: ``list`` of ``dict``s, or single ``dict`` taking form
{'cmd': <command to run>,
'output': <string that output should contain>}
:param arch: Architecture(s) that the local system is matched
against
:type arch: ``list``, or ``str`` of single architecture
:param required: For each parameter provided, should the checks
require all items, no items, or any items provided
:type required: ``dict``, with keys matching parameter names and values
being either 'any', 'all', or 'none. Default 'any'.
"""
#: The plugin that owns this predicate
_owner = None
#: Skip all collection?
dry_run = False
#: Kernel module enablement list
kmods = []
#: Services enablement list
services = []
#: Package presence list
packages = []
# Command output inclusion pairs {'cmd': 'foo --help', 'output': 'bar'}
cmd_outputs = []
#: Allowed architecture(s) of the system
arch = []
def __str(self, quote=False, prefix="", suffix=""):
"""Return a string representation of this SoSPredicate with
optional prefix, suffix and value quoting.
"""
quotes = '"%s"'
pstr = f"dry_run={self.dry_run}, "
kmods = self.kmods
kmods = [quotes % k for k in kmods] if quote else kmods
pstr += f"kmods=[{','.join(kmods)}], "
services = self.services
services = [quotes % s for s in services] if quote else services
pstr += f"services=[{','.join(services)}], "
pkgs = self.packages
pkgs = [quotes % p for p in pkgs] if quote else pkgs
pstr += f"packages=[{','.join(pkgs)}], "
cmdoutputs = [
f"{{ {quotes % 'cmd'}: {quotes % cmdoutput['cmd']}, "
f"{quotes % 'output'}: {quotes % cmdoutput['output']} }}"
for cmdoutput in self.cmd_outputs
]
pstr += f"cmdoutputs=[{','.join(cmdoutputs)}], "
arches = self.arch
arches = [quotes % a for a in arches] if quote else arches
pstr += f"arches=[{','.join(arches)}]"
return prefix + pstr + suffix
def __str__(self):
"""Return a string representation of this SoSPredicate.
"dry_run=False, kmods=[], services=[], cmdoutputs=[]"
"""
return self.__str()
def __repr__(self):
"""Return a machine readable string representation of this
SoSPredicate.
"SoSPredicate(dry_run=False, kmods=[], services=[], cmdoutputs=[])"
"""
return self.__str(quote=True, prefix="SoSPredicate(", suffix=")")
def _check_required_state(self, items, required):
"""Helper to simplify checking the state of the predicate's evaluations
against the setting of the required state of that evaluation
"""
if required == 'any':
return any(items)
elif required == 'all':
return all(items)
elif required == 'none':
return not any(items)
raise ValueError(
f"predicate requires must be 'any', 'all', or 'none' "
f"not {required}"
)
def _failed_or_forbidden(self, test, item):
"""Helper to direct failed predicates to provide the proper messaging
based on the required check type
:param test: The type of check we're doing, e.g. kmods, arch
:param item: The string of what failed
"""
_req = self.required[test]
if _req != 'none':
self._failed[test].append(item)
else:
self._forbidden[test].append(item)
def _eval_kmods(self):
if not self.kmods or self._owner.get_option('allow_system_changes'):
return True
_kmods = []
# Are kernel modules loaded?
for kmod in self.kmods:
res = self._owner.is_module_loaded(kmod)
_kmods.append(res)
if not res:
self._failed_or_forbidden('kmods', kmod)
return self._check_required_state(_kmods, self.required['kmods'])
def _eval_services(self):
if not self.services:
return True
_svcs = []
for svc in self.services:
res = self._owner.is_service_running(svc)
_svcs.append(res)
if not res:
self._failed_or_forbidden('services', svc)
return self._check_required_state(_svcs, self.required['services'])
def _eval_packages(self):
if not self.packages:
return True
_pkgs = []
for pkg in self.packages:
res = self._owner.is_installed(pkg)
_pkgs.append(res)
if not res:
self._failed_or_forbidden('packages', pkg)
return self._check_required_state(_pkgs, self.required['packages'])
def _eval_cmd_output(self, cmd_output):
"""Does 'cmd' output contain string 'output'?"""
if 'cmd' not in cmd_output or 'output' not in cmd_output:
return False
result = sos_get_command_output(cmd_output['cmd'])
if result['status'] != 0:
return False
for line in result['output'].splitlines():
if cmd_output['output'] in line:
return True
return False
def _eval_cmd_outputs(self):
if not self.cmd_outputs:
return True
_cmds = []
for cmd in self.cmd_outputs:
res = self._eval_cmd_output(cmd)
_cmds.append(res)
if not res:
self._failed_or_forbidden(
'cmd_outputs',
f"{cmd['cmd']}: {cmd['output']}"
)
return self._check_required_state(_cmds, self.required['cmd_outputs'])
def _eval_arch(self):
if not self.arch:
return True
# a test for 'all' against arch does not make sense, so only test to
# see if the system's reported architecture is in the last of 'allowed'
# arches requested by the predicate
_arch = self._owner.policy.get_arch()
regex = f'(?:{"|".join(self.arch)})'
if self.required['arch'] == 'none':
if re.match(regex, _arch):
self._forbidden['architecture'].append(_arch)
return False
return True
if re.match(regex, _arch):
return True
self._failed['architecture'].append(_arch)
return False
def _report_failed(self):
"""Return a string informing user what caused the predicate to fail
evaluation
"""
msg = ''
_substr = "required %s missing: %s."
for key, val in self._failed.items():
if not val:
continue
val = set(val)
msg += _substr % (key, ', '.join(v for v in val))
return msg
def _report_forbidden(self):
"""Return a string informing the user that a forbidden condition exists
which caused the predicate to fail
"""
msg = ''
_substr = "forbidden %s '%s' found."
for key, val in self._forbidden.items():
if not val:
continue
val = set(val)
msg += _substr % (key, ', '.join(v for v in val))
return msg
def report_failure(self):
"""Used by `Plugin()` to obtain the error string based on if the reason
was a failed check or a forbidden check
"""
msg = [
self._report_failed(),
self._report_forbidden(),
'(dry run)' if self.dry_run else ''
]
return " ".join(msg).lstrip()
def __bool__(self):
"""Predicate evaluation hook.
"""
# Null predicate?
if not any([self.kmods, self.services, self.packages, self.cmd_outputs,
self.arch, self.dry_run]):
return True
return ((self._eval_kmods() and self._eval_services() and
self._eval_packages() and self._eval_cmd_outputs() and
self._eval_arch())
and not self.dry_run)
def __init__(self, owner, dry_run=False, kmods=[], services=[],
packages=[], cmd_outputs=[], arch=[], required={}):
"""Initialise a new SoSPredicate object
"""
self._owner = owner
self.kmods = list(kmods)
self.services = list(services)
self.packages = list(packages)
self.arch = list(arch)
if not isinstance(cmd_outputs, list):
cmd_outputs = [cmd_outputs]
self.cmd_outputs = cmd_outputs
self.dry_run = dry_run | self._owner.commons['cmdlineopts'].dry_run
self.required = {'kmods': 'any', 'services': 'any', 'packages': 'any',
'cmd_outputs': 'any', 'arch': 'any'}
self.required.update({
k: v for k, v in required.items() if
required[k] != self.required[k]
})
#: Dict holding failed evaluations
self._failed = {
'kmods': [], 'services': [], 'packages': [], 'cmd_outputs': [],
'architecture': []
}
self._forbidden = {
'kmods': [], 'services': [], 'packages': [], 'cmd_outputs': [],
'architecture': []
}
class SoSCommand(object):
"""A class to represent a command to be collected.
A SoSCommand() object is instantiated for each command handed to an
_add_cmd_output() call, so that we no longer need to pass around a very
long tuple to handle the parameters.
Any option supported by _add_cmd_output() is passed to the SoSCommand
object and converted to an attribute. SoSCommand.__dict__ is then passed to
_get_command_output_now() for each command to be collected.
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __str__(self):
"""Return a human readable string representation of this SoSCommand
"""
return ', '.join(f"{param}={val}" for (param, val) in
sorted(self.__dict__.items()))
class PluginOpt():
"""This is used to define options available to plugins. Plugins will need
to define options alongside their distro-specific classes in order to add
support for user-controlled changes in Plugin behavior.
:param name: The name of the plugin option
:type name: ``str``
:param default: The default value of the option
:type default: Any
:param desc: A short description of the effect of the option
:type desc: ``str``
:param long_desc: A detailed description of the option. Will be used by
`sos info`
:type long_desc: ``str``
:param val_type: The type of object the option accepts for values. If
not provided, auto-detect from the type of ``default``
:type val_type: A single type or a ``list`` of types
"""
name = ''
default = None
enabled = False
desc = ''
long_desc = ''
value = None
val_type = [None]
plugin = ''
def __init__(self, name='undefined', default=None, desc='', long_desc='',
val_type=None):
self.name = name
self.default = default
self.desc = desc
self.long_desc = long_desc
self.value = self.default
if val_type is not None:
if not isinstance(val_type, list):
val_type = [val_type]
else:
val_type = [default.__class__]
self.val_type = val_type
def __str__(self):
items = [
f'name={self.name}',
f'desc=\'{self.desc}\'',
f'value={self.value}',
f'default={self.default}'
]
return '(' + ', '.join(items) + ')'
def __repr__(self):
return self.__str__()
def set_value(self, val):
# 'str' type accepts any value, incl. numbers
if type('') in self.val_type:
self.value = str(val)
return
if not any([type(val) is _t for _t in self.val_type]):
valid = []
for t in self.val_type:
if t is None:
continue
if t.__name__ == 'bool':
valid.append("boolean true/false (on/off, etc)")
elif t.__name__ == 'str':
valid.append("string (no spaces)")
elif t.__name__ == 'int':
valid.append("integer values")
raise Exception(
f"Plugin option '{self.plugin}.{self.name}' takes "
f"{', '.join(valid)}, not {type(val).__name__}"
)
self.value = val
class Plugin():
"""This is the base class for sos report plugins. Plugins should subclass
this and set the class variables where applicable.
:param commons: A set of information that is shared internally so that
plugins may access the same dataset. This is provided
automatically by sos
:type commons: ``dict``
Each `Plugin()` subclass should also subclass at least one tagging class,
e.g. ``RedHatPlugin``, to support that distribution. If different
distributions require different collections, each distribution should have
its own subclass of the Plugin that also subclasses the tagging class for
their respective distributions.
:cvar plugin_name: The name of the plugin, will be returned by `name()`
:vartype plugin_name: ``str``
:cvar packages: Package name(s) that, if installed, enable this plugin
:vartype packages: ``tuple``
:cvar files: File path(s) that, if present, enable this plugin
:vartype files: ``tuple``
:cvar commands: Executables that, if present, enable this plugin
:vartype commands: ``tuple``
:cvar kernel_mods: Kernel module(s) that, if loaded, enable this plugin
:vartype kernel_mods: ``tuple``
:cvar services: Service name(s) that, if running, enable this plugin
:vartype services: ``tuple``
:cvar architectures: Architecture(s) this plugin is enabled for. Defaults
to 'none' to enable on all arches.
:vartype architectures: ``tuple``, or ``None``
:cvar profiles: Name(s) of profile(s) this plugin belongs to
:vartype profiles: ``tuple``
:cvar plugin_timeout: Timeout in seconds for this plugin as a whole
:vartype plugin_timeout: ``int``
:cvar cmd_timeout: Timeout in seconds for individual commands
:vartype cmd_timeout: ``int``
"""
plugin_name = None
packages = ()
files = ()
commands = ()
kernel_mods = ()
services = ()
containers = ()
architectures = None
archive = None
profiles = ()
sysroot = '/'
plugin_timeout = TIMEOUT_DEFAULT
cmd_timeout = TIMEOUT_DEFAULT
_timeout_hit = False
cmdtags = {}
filetags = {}
option_list = []
# Default predicates
predicate = None
cmd_predicate = None
short_desc = "<no description available>"
def __init__(self, commons):
self.copied_files = []
self.executed_commands = []
self._env_vars = set()
self.alerts = []
self.custom_text = ""
self.commons = commons
self.forbidden_paths = []
self.copy_paths = set()
self.container_copy_paths = []
self.copy_strings = []
self.collect_cmds = []
self.options = {}
self.sysroot = commons['sysroot']
self.policy = commons['policy']
self.devices = commons['devices']
self.manifest = None
self.skip_files = commons['cmdlineopts'].skip_files
self.skip_commands = commons['cmdlineopts'].skip_commands
self.default_environment = {}
self._tail_files_list = []
self.soslog = self.commons['soslog'] if 'soslog' in self.commons \
else logging.getLogger('sos')
# add the default plugin opts
self.options.update(self.get_default_plugin_opts())
for popt in self.options:
self.options[popt].plugin = self.name()
for opt in self.option_list:
opt.plugin = self.name()
self.options[opt.name] = opt
# Initialise the default --dry-run predicate
self.set_predicate(SoSPredicate(self))
def get_default_plugin_opts(self):
return {
'timeout': PluginOpt(
'timeout', default=-1, val_type=int,
desc='Timeout in seconds for plugin to finish all collections'
),
'cmd-timeout': PluginOpt(
'cmd-timeout', default=-1, val_type=int,
desc='Timeout in seconds for individual commands to finish'
),
'postproc': PluginOpt(
'postproc', default=True, val_type=bool,
desc='Enable post-processing of collected data'
)
}
def set_plugin_manifest(self, manifest):
"""Pass in a manifest object to the plugin to write to
:param manifest: The manifest that the plugin will add metadata to
:type manifest: ``SoSManifest``
"""
self.manifest = manifest
# add these here for organization when they actually get set later
self.manifest.add_field('start_time', '')
self.manifest.add_field('end_time', '')
self.manifest.add_field('run_time', '')
self.manifest.add_field('setup_start', '')
self.manifest.add_field('setup_end', '')
self.manifest.add_field('setup_time', '')
self.manifest.add_field('timeout', self.timeout)
self.manifest.add_field('timeout_hit', False)
self.manifest.add_field('command_timeout', self.cmdtimeout)
self.manifest.add_list('commands', [])
self.manifest.add_list('files', [])
self.manifest.add_field('strings', {})
self.manifest.add_field('containers', {})
self.manifest.add_list('collections', [])
def set_default_cmd_environment(self, env_vars):
"""
Specify a collection of environment variables that should always be
passed to commands being executed by this plugin.
:param env_vars: The environment variables and their values to set
:type env_vars: ``dict{ENV_VAR_NAME: ENV_VAR_VALUE}``
"""
if not isinstance(env_vars, dict):
raise TypeError(
"Environment variables for Plugin must be specified by dict"
)
self.default_environment = env_vars
self._log_debug("Default environment for all commands now set to "
f"{self.default_environment}")
def add_default_cmd_environment(self, env_vars):
"""
Add or modify a specific environment variable in the set of default
environment variables used by this Plugin.
:param env_vars: The environment variables to add to the current
set of env vars in use
:type env_vars: ``dict``
"""
if not isinstance(env_vars, dict):
raise TypeError("Environment variables must be added via dict")
self._log_debug(f"Adding {env_vars} to default environment")
self.default_environment.update(env_vars)
def _get_cmd_environment(self, env=None):
"""
Get the merged set of environment variables for a command about to be
executed by this plugin.
:returns: The set of env vars to use for a command
:rtype: ``dict``
"""
if env is None:
return self.default_environment
if not isinstance(env, dict):
raise TypeError("Command env vars must be passed as dict")
_env = self.default_environment.copy()
_env.update(env)
return _env
def timeout_from_options(self, optname, plugoptname, default_timeout):
"""
Get the timeout value for either the plugin or a command, as
determined by either the value provided via the
plugin.timeout or plugin.cmd-timeout option, the global timeout or
cmd-timeout options, or the default value set by the plugin or the
collection, in that order of precendence.
:param optname: The name of the cmdline option being checked, either
'plugin_timeout' or 'timeout'
:type optname: ``str``
:param plugoptname: The name of the plugin option name being checked,
either 'timeout' or 'cmd-timeout'
:type plugoptname: ``str``
:param default_timeout: The timeout to default to if determination is
inconclusive or hits an error
:type default_timeout: ``int``
:returns: The timeout value in seconds
:rtype: ``int``
"""
_timeout = None
try:
opt_timeout = self.get_option(optname)
own_timeout = int(self.get_option(plugoptname))
if opt_timeout is None:
_timeout = own_timeout
elif opt_timeout is not None and own_timeout == -1:
if opt_timeout == TIMEOUT_DEFAULT:
_timeout = default_timeout
else:
_timeout = int(opt_timeout)
elif opt_timeout is not None and own_timeout > -1:
_timeout = own_timeout
else:
return None
except ValueError:
return default_timeout # Default to known safe value
if _timeout is not None and _timeout > -1:
return _timeout
return default_timeout
@property
def timeout(self):
"""Returns either the default plugin timeout value, the value as
provided on the commandline via -k plugin.timeout=value, or the value
of the global --plugin-timeout option.
"""
_timeout = self.timeout_from_options('plugin_timeout', 'timeout',
self.plugin_timeout)
return _timeout
@property
def cmdtimeout(self):
"""Returns either the default command timeout value, the value as
provided on the commandline via -k plugin.cmd-timeout=value, or the
value of the global --cmd-timeout option.
"""
_cmdtimeout = self.timeout_from_options('cmd_timeout', 'cmd-timeout',
self.cmd_timeout)
return _cmdtimeout
def set_timeout_hit(self):
self._timeout_hit = True
self.manifest.add_field('end_time', datetime.now())
self.manifest.add_field('timeout_hit', True)
def check_timeout(self):
"""
Checks to see if the plugin has hit its timeout.
This is set when the sos.collect_plugin() method hits a timeout and
terminates the thread. From there, a Popen() call can still continue to
run, and we need to manually terminate it. Thus, check_timeout() should
only be called in sos_get_command_output().
Since sos_get_command_output() is not plugin aware, this method is
handed to that call to use as a polling method, to avoid passing the
entire plugin object.
:returns: ``True`` if timeout has been hit, else ``False``
:rtype: ``bool``
"""
return self._timeout_hit
@classmethod
def name(cls):
"""Get the name of the plugin
:returns: The name of the plugin, in lowercase
:rtype: ``str``
"""
if cls.plugin_name:
return cls.plugin_name
return cls.__name__.lower()
@classmethod
def display_help(cls, section):
if cls.plugin_name is None:
cls.display_self_help(section)
else:
cls.display_plugin_help(section)
@classmethod
def display_plugin_help(cls, section):
from sos.help import TERMSIZE
section.set_title(f"{cls.plugin_name.title()} Plugin Information - "
f"{cls.short_desc}")
missing = '\nDetailed information is not available for this plugin.\n'
# Concatenate the docstrings of distro-specific plugins with their
# base classes, if available.
try:
_doc = ''
_sc = cls.__mro__[1]
if _sc != Plugin and _sc.__doc__:
_doc = _sc.__doc__
if cls.__doc__:
_doc += cls.__doc__
except Exception:
_doc = None
section.add_text(f'\n {_doc if _doc else missing}')
if not any([cls.packages, cls.commands, cls.files, cls.kernel_mods,
cls.services, cls.containers]):
section.add_text("This plugin is always enabled by default.")
else:
for trig in ['packages', 'commands', 'files', 'kernel_mods',
'services']:
if getattr(cls, trig, None):
section.add_text(
f"Enabled by {trig}: {', '.join(getattr(cls, trig))}",
newline=False
)
if getattr(cls, 'containers'):
section.add_text(
"Enabled by containers with names matching: "
f"{', '.join(c for c in cls.containers)}",
newline=False
)
if cls.profiles:
section.add_text(
"Enabled with the following profiles: "
f"{', '.join(p for p in cls.profiles)}",
newline=False
)
if hasattr(cls, 'verify_packages'):
# pylint: disable=no-member
section.add_text(
"\nVerfies packages (when using --verify): "
f"{', '.join(pkg for pkg in cls.verify_packages)}",
newline=False,
)
if cls.postproc is not Plugin.postproc:
section.add_text(
'This plugin performs post-processing on potentially '
'sensitive collections. Disabling post-processing may'
' leave sensitive data in plaintext.'
)
if not cls.option_list:
return
optsec = section.add_section('Plugin Options')
optsec.add_text(
"These options may be toggled or changed using "
f"'{bold(f'-k {cls.plugin_name}.option_name=$value')}'"
)
optsec.add_text(
bold((f"\n{' ':<4}{'Option Name':<20}{'Default':<30}"
f"{'Description':<20}")),
newline=False
)
opt_indent = ' ' * 54
for opt in cls.option_list:
_def = opt.default
# convert certain values to text meanings
if _def is None or _def == '':
_def = "None/Unset"
if isinstance(opt.default, bool):
if opt.default:
_def = "True/On"
else:
_def = "False/Off"
_ln = f"{' ':<4}{opt.name:<20}{_def:<30}{opt.desc:<20}"
optsec.add_text(
textwrap.fill(_ln, width=TERMSIZE,
subsequent_indent=opt_indent),
newline=False
)
if opt.long_desc:
_size = TERMSIZE - 10
space = ' ' * 8
optsec.add_text(
textwrap.fill(opt.long_desc, width=_size,
initial_indent=space,
subsequent_indent=space),
newline=False
)
@classmethod
def display_self_help(cls, section):
section.set_title("SoS Plugin Detailed Help")
section.add_text(
"Plugins are what define what collections occur for a given "
f"{bold('sos report')} execution. Plugins are generally "
"representative of a single system component (e.g. kernel), "
"package (e.g. podman), or similar construct. Plugins will "
"typically specify multiple files or directories to copy, as well"
" as commands to execute and collect the output of for further "
"analysis."
)
subsec = section.add_section('Plugin Enablement')
subsec.add_text(
'Plugins will be automatically enabled based on one of several '
'triggers - a certain package being installed, a command or file '
'existing, a kernel module being loaded, etc...'
)
subsec.add_text(
"Plugins may also be enabled or disabled by name using the "
f"{bold('-e $name')} or {bold('-n $name')} options respectively."
)
subsec.add_text(
"Certain plugins may only be available for specific distributions "
"or may behave differently on different distributions based on how"
" the component for that plugin is installed or how it operates."
f" When using {bold('sos help report.plugins.$plugin')}, help will"
" be displayed for the version of the plugin appropriate for your "
"distribution."
)
optsec = section.add_section('Using Plugin Options')
optsec.add_text(
"Many plugins support additional options to enable/disable or in "
"some other way modify the collections it makes. Plugin options "
f"are set using the {bold('-k $plugin_name.$option_name=$value')} "
"syntax. Options that are on/off toggles may exclude setting a "
"value, which will be interpreted as enabling that option.\n\nSee"
f" specific plugin help sections or {bold('sos report -l')} for "
"more information on these options"
)
seealso = section.add_section('See Also')
_also = {
'report.plugins.$plugin': 'Help for a specific $plugin',
'policies': 'Information on distribution policies'
}
seealso.add_text(
"Additional relevant information may be available in these "
"help sections:\n\n%s" % "\n".join(
f"{' ':>8}{sec:<30}{desc:<30}"
for sec, desc in _also.items()
), newline=False
)
def _format_msg(self, msg):
# safeguard against non-UTF logging, see #2790 for reference
return (f"[plugin:{self.name()}] "
f"{msg.encode('utf-8', 'replace').decode()}")
def _log_error(self, msg):
self.soslog.error(self._format_msg(msg))
def _log_warn(self, msg):
self.soslog.warning(self._format_msg(msg))
def _log_info(self, msg):
self.soslog.info(self._format_msg(msg))
def _log_debug(self, msg):
self.soslog.debug(self._format_msg(msg))
def strip_sysroot(self, path):
"""Remove the configured sysroot from a filesystem path
:param path: The filesystem path to strip sysroot from
:type path: ``str``
:returns: The stripped filesystem path
:rtype: ``str``
"""
if not self.use_sysroot():
return path
if self.sysroot and path.startswith(self.sysroot):
return path[len(self.sysroot):]
return path
def use_sysroot(self):
"""Determine if the configured sysroot needs to be used
:returns: ``True`` if sysroot is not `/`, else ``False``
:rtype: ``bool``
"""
return self.sysroot != os.path.abspath(os.sep)
def tmp_in_sysroot(self):
"""Check if sysroot is within the archive's temp directory
:returns: ``True`` if sysroot is in the archive's temp directory, else
``False``
:rtype: ``bool``
"""
# if sysroot is still None, that implies '/'
_sysroot = self.sysroot or '/'
paths = [_sysroot, self.archive.get_tmp_dir()]
return os.path.commonprefix(paths) == _sysroot
def is_installed(self, package_name):
"""Is the package $package_name installed?
:param package_name: The name of the package to check
:type package_name: ``str``
:returns: ``True`` id the package is installed, else ``False``
:rtype: ``bool``
"""
return (
len(self.policy.package_manager.all_pkgs_by_name(package_name)) > 0
)