/
utils.py
672 lines (539 loc) · 26.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
#
# Copyright (C) 2019 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
import gi
gi.require_version("BlockDev", "2.0")
from gi.repository import BlockDev as blockdev
from collections import defaultdict
from blivet import arch, util
from blivet.devicefactory import get_device_type
from blivet.size import Size
from pyanaconda import isys
from pyanaconda.core.configuration.anaconda import conf
from pyanaconda.core.constants import productName, STORAGE_REFORMAT_BLACKLIST, \
STORAGE_REFORMAT_WHITELIST, STORAGE_MIN_PARTITION_SIZES, STORAGE_MIN_RAM, \
STORAGE_SWAP_IS_RECOMMENDED, STORAGE_MUST_BE_ON_ROOT, STORAGE_MUST_BE_ON_LINUXFS, \
STORAGE_LUKS2_MIN_RAM, STORAGE_ROOT_DEVICE_TYPES, STORAGE_REQ_PARTITION_SIZES, \
STORAGE_MUST_NOT_BE_ON_ROOT
from pyanaconda.core.i18n import _
from pyanaconda.core.storage import DEVICE_TEXT_MAP
from pyanaconda.modules.storage.platform import platform
from pyanaconda.anaconda_loggers import get_module_logger
log = get_module_logger(__name__)
def verify_root(storage, constraints, report_error, report_warning):
""" Verify the root.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
root = storage.fsset.root_device
if not root:
report_error(_("You have not defined a root partition (/), "
"which is required for installation of %s"
" to continue.") % (productName,))
if root and root.format.exists and root.format.mountable and root.format.mountpoint == "/":
report_error(_("You must create a new file system on the root device."))
if storage.root_device and constraints[STORAGE_ROOT_DEVICE_TYPES]:
device_type = get_device_type(storage.root_device)
device_types = constraints[STORAGE_ROOT_DEVICE_TYPES]
if device_type not in device_types:
report_error(_("Your root partition must be on a device of type: %s.")
% ", ".join(DEVICE_TEXT_MAP[t] for t in device_types))
def verify_s390_constraints(storage, constraints, report_error, report_warning):
""" Verify constraints for s390x.
Prevent users from installing on s390x with LDL DASD disks.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
if not arch.is_s390():
return
for disk in storage.disks:
if disk.type == "dasd" and blockdev.s390.dasd_is_ldl(disk.name):
report_error(_("The LDL DASD disk {name} ({busid}) cannot be used "
"for the installation. Please format it.")
.format(name="/dev/" + disk.name, busid=disk.busid))
def verify_partition_formatting(storage, constraints, report_error, report_warning):
""" Verify partitions that should be reformatted by default.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
mountpoints = [
mount for mount, device in storage.mountpoints.items()
if device.format.exists
and device.format.linux_native
and not any(filter(mount.startswith, constraints[STORAGE_REFORMAT_BLACKLIST]))
and any(filter(mount.startswith, constraints[STORAGE_REFORMAT_WHITELIST]))
]
for mount in mountpoints:
report_warning(_("It is recommended to create a new file system on your "
"%(mount)s partition.") % {'mount': mount})
def verify_partition_sizes(storage, constraints, report_error, report_warning):
""" Verify the minimal and required partition sizes.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
filesystems = storage.mountpoints
for (mount, size) in constraints[STORAGE_MIN_PARTITION_SIZES].items():
if mount in filesystems and filesystems[mount].size < size:
report_warning(_("Your %(mount)s partition is less than "
"%(size)s which is lower than recommended "
"for a normal %(productName)s install.")
% {'mount': mount, 'size': size,
'productName': productName})
for (mount, size) in constraints[STORAGE_REQ_PARTITION_SIZES].items():
if mount in filesystems and filesystems[mount].size < size:
report_error(_("Your %(mount)s partition size is lower "
"than required %(size)s.")
% {'mount': mount, 'size': size})
def verify_partition_format_sizes(storage, constraints, report_error, report_warning):
""" Verify that the size of the device is allowed by the format used.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
# storage.mountpoints is a property that returns a new dict each time, so
# iterating over it is thread-safe.
filesystems = storage.mountpoints
for (mount, device) in filesystems.items():
problem = filesystems[mount].check_size()
if problem < 0:
report_error(_("Your %(mount)s partition is too small for "
"%(format)s formatting (allowable size is "
"%(minSize)s to %(maxSize)s)")
% {"mount": mount, "format": device.format.name,
"minSize": device.min_size, "maxSize": device.max_size})
elif problem > 0:
report_error(_("Your %(mount)s partition is too large for "
"%(format)s formatting (allowable size is "
"%(minSize)s to %(maxSize)s)")
% {"mount": mount, "format": device.format.name,
"minSize": device.min_size, "maxSize": device.max_size})
def verify_bootloader(storage, constraints, report_error, report_warning):
""" Verify that the size of the device is allowed by the format used.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
if storage.bootloader and not storage.bootloader.skip_bootloader:
stage1 = storage.bootloader.stage1_device
if not stage1:
report_error(_("No valid boot loader target device found. "
"See below for details."))
pe = platform.stage1_missing_error
if pe:
report_error(_(pe))
else:
storage.bootloader.is_valid_stage1_device(stage1)
for msg in storage.bootloader.errors:
report_error(msg)
for msg in storage.bootloader.warnings:
report_warning(msg)
stage2 = storage.bootloader.stage2_device
if stage1 and not stage2:
report_error(_("You have not created a bootable partition."))
else:
storage.bootloader.is_valid_stage2_device(stage2)
for msg in storage.bootloader.errors:
report_error(msg)
for msg in storage.bootloader.warnings:
report_warning(msg)
if not storage.bootloader.check():
for msg in storage.bootloader.errors:
report_error(msg)
def verify_gpt_biosboot(storage, constraints, report_error, report_warning):
""" Verify that GPT boot disk on BIOS system has a BIOS boot partition.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
if storage.bootloader and not storage.bootloader.skip_bootloader:
stage1 = storage.bootloader.stage1_device
if arch.is_x86() and not arch.is_efi() and stage1 and stage1.is_disk \
and getattr(stage1.format, "label_type", None) == "gpt":
missing = True
for part in [p for p in storage.partitions if p.disk == stage1]:
if part.format.type == "biosboot":
missing = False
break
if missing:
report_error(_("Your BIOS-based system needs a special "
"partition to boot from a GPT disk label. "
"To continue, please create a 1MiB "
"'biosboot' type partition."))
def verify_swap(storage, constraints, report_error, report_warning):
""" Verify the existence of swap.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
swaps = storage.fsset.swap_devices
if not swaps:
installed = util.total_memory()
required = constraints[STORAGE_MIN_RAM] + Size("{} MiB".format(isys.NO_SWAP_EXTRA_RAM))
if not constraints[STORAGE_SWAP_IS_RECOMMENDED]:
if installed < required:
report_warning(_("You have not specified a swap partition. "
"%(requiredMem)s of memory is recommended to continue "
"installation without a swap partition, but you only "
"have %(installedMem)s.")
% {"requiredMem": required, "installedMem": installed})
else:
if installed < required:
report_error(_("You have not specified a swap partition. "
"%(requiredMem)s of memory is required to continue "
"installation without a swap partition, but you only "
"have %(installedMem)s.")
% {"requiredMem": required, "installedMem": installed})
else:
report_warning(_("You have not specified a swap partition. "
"Although not strictly required in all cases, "
"it will significantly improve performance "
"for most installations."))
def verify_swap_uuid(storage, constraints, report_error, report_warning):
""" Verify swap uuid.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
swaps = storage.fsset.swap_devices
no_uuid = [s for s in swaps if s.format.exists and not s.format.uuid]
if no_uuid:
report_warning(_("At least one of your swap devices does not have "
"a UUID, which is common in swap space created "
"using older versions of mkswap. These devices "
"will be referred to by device path in "
"/etc/fstab, which is not ideal since device "
"paths can change under a variety of "
"circumstances. "))
def verify_mountpoints_on_root(storage, constraints, report_error, report_warning):
""" Verify mountpoints on the root.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
for mountpoint in storage.mountpoints:
if mountpoint in constraints[STORAGE_MUST_BE_ON_ROOT]:
report_error(_("This mount point is invalid. The %s directory must "
"be on the / file system.") % mountpoint)
def verify_mountpoints_not_on_root(storage, constraints, report_error, report_warning):
""" Verify mountpoints not on the root.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
filesystems = storage.mountpoints
for mountpoint in constraints[STORAGE_MUST_NOT_BE_ON_ROOT]:
if mountpoint not in filesystems:
report_error(_("Your %s must be on a separate partition or LV.")
% mountpoint)
def verify_mountpoints_on_linuxfs(storage, constraints, report_error, report_warning):
""" Verify mountpoints on linuxfs.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
filesystems = storage.mountpoints
for (mountpoint, dev) in filesystems.items():
if mountpoint in constraints[STORAGE_MUST_BE_ON_LINUXFS] \
and (not dev.format.mountable or not dev.format.linux_native):
report_error(_("The mount point %s must be on a linux file system.") % mountpoint)
def verify_unlocked_devices_have_key(storage, constraints, report_error, report_warning):
""" Verify that existing unlocked LUKS devices have some way of obtaining a key.
Blivet doesn't remove decrypted devices after a teardown of unlocked LUKS devices
and later fails to set them up without a key, so report an error to prevent a
traceback during the installation.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
devices = [
d for d in storage.devices
if d.format.type == "luks"
and d.format.exists
and not d.format.has_key
and d.children
and any(c.name == d.format.map_name for c in d.children)
]
for dev in devices:
report_error(_("The existing unlocked LUKS device {} cannot be used for "
"the installation without an encryption key specified for "
"this device. Please, rescan the storage.").format(dev.name))
def verify_luks_devices_have_key(storage, constraints, report_error, report_warning):
""" Verify that all non-existant LUKS devices have some way of obtaining a key.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
Note: LUKS device creation will fail without a key.
"""
devices = [d for d in storage.devices
if d.format.type == "luks"
and not d.format.exists
and not d.format.has_key]
for dev in devices:
report_error(_("Encryption requested for LUKS device %s but no "
"encryption key specified for this device.") % (dev.name,))
def verify_luks2_memory_requirements(storage, constraints, report_error, report_warning):
""" Verify that there is enough available memory for LUKS2 format.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
devices = [d for d in storage.devices
if d.format.type == "luks"
and d.format.luks_version == "luks2"
and d.format.pbkdf_args is None
and not d.format.exists]
available_memory = util.available_memory()
log.debug("Available memory: %s", available_memory)
if devices and available_memory < constraints[STORAGE_LUKS2_MIN_RAM]:
report_warning(_("The available memory is less than %(size)s which can "
"be too small for LUKS2 format. It may fail.")
% {"size": constraints[STORAGE_LUKS2_MIN_RAM]})
def verify_mounted_partitions(storage, constraints, report_error, report_warning):
""" Check the selected disks to make sure all their partitions are unmounted.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
for disk in storage.disks:
if disk.protected:
continue
if not disk.partitioned:
continue
for part in disk.format.partitions:
part_dev = storage.devicetree.get_device_by_path(part.path)
if part_dev and part_dev.protected:
log.debug("Not checking protected %s for being mounted, assuming live "
"image mount", part.path)
continue
if part.busy:
report_error(_("%s is currently mounted and cannot be used for the "
"installation. Please unmount it and retry.") % part.path)
def verify_lvm_destruction(storage, constraints, report_error, report_warning):
"""Verify that destruction of LVM devices is correct.
A VG and all its PVs must be all destroyed together, nothing is allowed to remain.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
# Implementation detects VGs that should be destroyed and aren't. This is because of how
# blivet implements ignoring devices: Ignoring a device hides also all dependent devices - for
# any disk, it could be a PV partition, which would then hide also all VGs and LVs that depend
# on the PV. It does not matter that the VG needs other PVs too - the hiding wins.
destroyed_vg_names = set()
all_touched_disks_by_vg = defaultdict(list)
for action in storage.devicetree.actions:
if action.is_destroy and action.is_device and action.device.type == "lvmvg":
destroyed_vg_names.add(action.device.name)
elif action.is_destroy and action.is_format and action.orig_format.type == "lvmpv":
# Check if the PV actually had any VG assigned
if action.orig_format.vg_name:
disk_name = action.device.disk.name
vg_name = action.orig_format.vg_name
all_touched_disks_by_vg[vg_name].append(disk_name)
for vg_name, disks in all_touched_disks_by_vg.items():
if vg_name not in destroyed_vg_names:
report_error(_(
"Selected disks {} contain volume group '{}' that also uses further unselected "
"disks. You must select or de-select all these disks as a set."
.format(", ".join(disks), vg_name)
))
class StorageCheckerReport(object):
"""Class for results of the storage checking."""
def __init__(self):
self.info = list()
self.errors = list()
self.warnings = list()
@property
def all_errors(self):
"""Return a list of errors and warnings."""
return self.errors + self.warnings
@property
def success(self):
"""Success, if no errors and warnings were reported."""
return not self.failure
@property
def failure(self):
"""Failure, if some errors or warnings were reported."""
return bool(self.errors or self.warnings)
def add_info(self, msg):
""" Add an error message.
:param str msg: an info message
"""
self.info.append(msg)
def add_error(self, msg):
""" Add an error message.
:param str msg: an error message
"""
self.add_info("Found sanity error: %s" % msg)
self.errors.append(msg)
def add_warning(self, msg):
""" Add a warning message.
:param str msg: a warning message
"""
self.add_info("Found sanity warning: %s" % msg)
self.warnings.append(msg)
def log(self, logger, error=True, warning=True, info=True):
""" Log the messages.
:param logger: an instance of logging.Logger
:param bool error: should we log the error messages?
:param bool warning: should we log the warning messages?
:param bool info: should we log the info messages?
"""
if info:
for msg in self.info:
logger.debug(msg)
if error:
for msg in self.errors:
logger.error(msg)
if warning:
for msg in self.warnings:
logger.warning(msg)
class StorageChecker(object):
"""Class for advanced storage checking."""
def __init__(self):
self.checks = list()
self.constraints = dict()
def add_check(self, callback):
""" Add a callback for storage checking.
:param callback: a check for the storage checking
:type callback: a function with arguments (storage, constraints,
report_error, report_warning), where storage is an instance of the
storage to check, constraints is a dictionary of constraints and
report_error and report_warning are functions for reporting messages.
"""
self.checks.append(callback)
def remove_check(self, callback):
""" Remove a callback for storage checking.
:param callback: a check for the storage checking
"""
if callback in self.checks:
self.checks.remove(callback)
def add_constraint(self, name, value):
""" Add a new constraint for storage checking.
KeyError will be raised if the constraint already exists.
:param str name: a name of the new constraint
:param value: a value of the constraint
"""
if name in self.constraints:
raise KeyError("The constraint {} already exists.".format(name))
self.constraints[name] = value
def set_constraint(self, name, value):
""" Set an existing constraint to a new value.
KeyError will be raised if the constraint does not exist.
:param str name: a name of the existing constraint
:param value: a value of the constraint
"""
if name not in self.constraints:
raise KeyError("The constraint {} does not exist.".format(name))
self.constraints[name] = value
def check(self, storage, constraints=None, skip=None):
""" Run a series of tests to verify the storage configuration.
This function is called at the end of partitioning so that we can make
sure you don't have anything silly (like no /, a really small /, etc).
:param storage: the storage object to check
:param constraints: an dictionary of constraints that will be used by
checks or None if we want to use the storage checker's constraints
:param skip: a collection of checks we want to skip or None if we don't
want to skip any
:return an instance of StorageCheckerReport with reported errors and warnings
"""
if constraints is None:
constraints = self.constraints
# Report the constraints.
result = StorageCheckerReport()
result.add_info("Storage check started with constraints %s."
% constraints)
# Process checks.
for check in self.checks:
# Skip this check.
if skip and check in skip:
result.add_info("Skipped sanity check %s." % check.__name__)
continue
# Run the check.
result.add_info("Run sanity check %s." % check.__name__)
check(storage, constraints, result.add_error, result.add_warning)
# Report the result.
if result.success:
result.add_info("Storage check finished with success.")
else:
result.add_info("Storage check finished with failure(s).")
return result
def get_default_constraint_names(self):
"""Get a list of default constraint names."""
return [
STORAGE_MIN_RAM,
STORAGE_ROOT_DEVICE_TYPES,
STORAGE_MIN_PARTITION_SIZES,
STORAGE_REQ_PARTITION_SIZES,
STORAGE_MUST_BE_ON_LINUXFS,
STORAGE_MUST_BE_ON_ROOT,
STORAGE_MUST_NOT_BE_ON_ROOT,
STORAGE_REFORMAT_WHITELIST,
STORAGE_REFORMAT_BLACKLIST,
STORAGE_SWAP_IS_RECOMMENDED,
STORAGE_LUKS2_MIN_RAM,
]
def set_default_constraints(self):
"""Set the default constraints needed by default checks."""
self.constraints = dict()
for name in self.get_default_constraint_names():
self.add_constraint(name, getattr(conf.storage_constraints, name))
def set_default_checks(self):
"""Set the default checks."""
self.checks = list()
self.add_check(verify_root)
self.add_check(verify_s390_constraints)
self.add_check(verify_partition_formatting)
self.add_check(verify_partition_sizes)
self.add_check(verify_partition_format_sizes)
self.add_check(verify_bootloader)
self.add_check(verify_gpt_biosboot)
self.add_check(verify_swap)
self.add_check(verify_swap_uuid)
self.add_check(verify_mountpoints_on_linuxfs)
self.add_check(verify_mountpoints_on_root)
self.add_check(verify_mountpoints_not_on_root)
self.add_check(verify_unlocked_devices_have_key)
self.add_check(verify_luks_devices_have_key)
self.add_check(verify_luks2_memory_requirements)
self.add_check(verify_mounted_partitions)
self.add_check(verify_lvm_destruction)
# Setup the storage checker.
storage_checker = StorageChecker()
storage_checker.set_default_constraints()
storage_checker.set_default_checks()