-
-
Notifications
You must be signed in to change notification settings - Fork 33
/
cli.py
849 lines (720 loc) Β· 26.2 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
# Copyright Kevin Deldycke <kevin@deldycke.com> and contributors.
# All Rights Reserved.
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import functools
import logging
import re
from datetime import datetime
from functools import partial
from io import TextIOWrapper
from operator import getitem, itemgetter
from pathlib import Path
from sys import __stdin__, __stdout__
from time import time as time_now
import click
import click_log
import tomli
import tomli_w
from boltons.cacheutils import LRI, cached
from boltons.strutils import complement_int_list, int_ranges_from_int_list, strip_ansi
from cli_helpers.tabular_output import TabularOutputFormatter
from click_help_colors import HelpColorsCommand, HelpColorsGroup, version_option
from simplejson import dumps as json_dumps
from . import CLI_NAME, __version__, env_data, logger
from .base import CLI_FORMATS, CLIError, PackageManager
from .config import load_conf
from .managers import pool
from .platform import CURRENT_OS_ID, WINDOWS, os_label
from .version import TokenizedString
# Initialize the table formatter.
table_formatter = TabularOutputFormatter()
# Register all rendering modes for table data.
RENDERING_MODES = {"json"}
RENDERING_MODES.update(table_formatter.supported_formats)
RENDERING_MODES = frozenset(RENDERING_MODES)
# List of unicode rendering modes that will fall back to ascii on windows.
# Windows has some hard time printing unicode characters to console output. It
# seems to be an effect of cp1252 encoding and/or click not able to transcode
# chars. Here is the traceback:
#
# File "(...)\meta_package_manager\cli.py", line 133, in print_table
# click.echo(line)
# File "(...)\site-packages\click\utils.py", line 272, in echo
# file.write(message)
# File "(...)\lib\encodings\cp1252.py", line 19, in encode
# return codecs.charmap_encode(input,self.errors,encoding_table)[0]
# UnicodeEncodeError: 'charmap' codec can't encode characters in position
# 0-140: character maps to <undefined>
#
# Fortunately, I found the fundamental issue and I no longer need to blacklist
# some rendering modes. See: a3008f8c3a42efedd88378f087202b73d907bbb7 . I'll
# still keep the construct around just in case I need to quickly blacklist
# some.
WINDOWS_MODE_BLACKLIST = frozenset([])
# List of fields IDs allowed to be sorted.
SORTABLE_FIELDS = {
"manager_id",
"manager_name",
"package_id",
"package_name",
"version",
}
# Pre-rendered UI-elements.
OK = click.style("β", fg="green")
KO = click.style("β", fg="red")
click_log.basic_config(logger)
def json(data):
"""Utility function to render data structure into pretty printed JSON.
Also care of internal objects like `TokenizedString` and `Path`:
"""
def serialize_objects(obj):
if isinstance(obj, (TokenizedString, Path)):
return str(obj)
raise TypeError(repr(obj) + " is not JSON serializable.")
return json_dumps(
data,
sort_keys=True,
indent=4,
separators=(",", ": "),
default=serialize_objects,
)
def print_table(header_defs, rows, sort_key=None):
"""Utility to print a table and sort its content."""
# Do not print anything, not even table headers if no rows.
if not rows:
return
header_labels = [label for label, _ in header_defs]
# Check there is no duplicate column IDs.
header_ids = [col_id for _, col_id in header_defs if col_id]
assert len(header_ids) == len(set(header_ids))
# Default sorting follows the order of headers.
sort_order = list(range(len(header_defs)))
# Move the sorting key's index in the front of priority.
if sort_key and sort_key in header_ids:
# Build an index of column id's position.
col_index = {col_id: i for i, (_, col_id) in enumerate(header_defs) if col_id}
sort_column_index = col_index[sort_key]
sort_order.remove(sort_column_index)
sort_order.insert(0, sort_column_index)
def sort_method(line):
"""Serialize line's content for natural sorting.
1. Extract each cell value in the order provided by `sort_order`;
2. Strip terminal color formating;
3. Then tokenize each cell's content for user-friendly natural sorting.
"""
sorting_key = []
for cell in itemgetter(*sort_order)(line):
if isinstance(cell, TokenizedString):
key = cell
else:
key = TokenizedString(strip_ansi(cell))
sorting_key.append(key)
return tuple(sorting_key)
for line in table_formatter.format_output(
sorted(rows, key=sort_method), header_labels, disable_numparse=True
):
click.echo(line.encode("utf-8"))
def print_stats(data):
"""Print statistics."""
manager_stats = {infos["id"]: len(infos["packages"]) for infos in data.values()}
total_installed = sum(manager_stats.values())
per_manager_totals = ", ".join(
[
f"{k}: {v}"
for k, v in sorted(manager_stats.items(), key=itemgetter(1), reverse=True)
]
)
if per_manager_totals:
per_manager_totals = f" ({per_manager_totals})"
plural = "s" if total_installed > 1 else ""
click.echo(f"{total_installed} package{plural} total{per_manager_totals}.")
class timeit:
"""Decorator to measure and print elapsed execution time of a function."""
def __call__(self, func):
@functools.wraps(func)
def decorated(*args, **kwargs):
with self:
return func(*args, **kwargs)
return decorated
def __enter__(self):
self.measure_time = click.get_current_context().obj["time"]
if self.measure_time:
self.start_time = time_now()
def __exit__(self, *args, **kwargs):
if self.measure_time:
elapsed = time_now() - self.start_time
click.echo(f"Execution time: {elapsed:.3} seconds.")
@click.group(
context_settings=dict(
show_default=True,
auto_envvar_prefix=CLI_NAME,
)
)
@click.option(
"-m",
"--manager",
type=click.Choice(pool(), case_sensitive=False),
multiple=True,
help="Restrict sub-command to a subset of package managers. Repeat to "
"select multiple managers.",
)
@click.option(
"-e",
"--exclude",
type=click.Choice(pool(), case_sensitive=False),
multiple=True,
help="Exclude a package manager. Repeat to exclude multiple managers.",
)
@click.option(
"--ignore-auto-updates/--include-auto-updates",
default=True,
help="Report all outdated packages, including those tagged as "
"auto-updating. Only applies to 'outdated' and 'upgrade' commands.",
)
@click.option(
"-o",
"--output-format",
type=click.Choice(sorted(RENDERING_MODES), case_sensitive=False),
default="psql_unicode",
help="Rendering mode of the output.",
)
@click.option(
"-s",
"--sort-by",
type=click.Choice(SORTABLE_FIELDS, case_sensitive=False),
default="manager_id",
help="Sort results.",
)
@click.option(
"--stats/--no-stats",
default=True,
help="Print per-manager package statistics.",
)
@click.option(
"--time/--no-time",
default=False,
help="Measure and print elapsed execution time.",
)
@click.option(
"--stop-on-error/--continue-on-error",
default=False,
help="Stop right away or continue operations on manager CLI error.",
)
@click.option(
"-C",
"--config",
metavar="CONFIG_PATH",
type=click.Path(path_type=Path, resolve_path=True),
# default=default_conf_path(),
help="Location of the configuration file.",
# Force eagerness so the config option's callback gets the oportunity to set the
# default_map values before the other options use them.
is_eager=True,
callback=load_conf,
expose_value=False,
)
@click_log.simple_verbosity_option(
logger,
default="INFO",
metavar="LEVEL",
help="Either CRITICAL, ERROR, WARNING, INFO or DEBUG.",
)
@version_option(
version=__version__,
prog_name=CLI_NAME,
version_color="green",
prog_name_color="white",
message=f"%(prog)s %(version)s\n{env_data}",
message_color="bright_black",
)
@click.help_option("-h", "--help")
@click.pass_context
def cli(
ctx,
manager,
exclude,
ignore_auto_updates,
output_format,
sort_by,
stats,
time,
stop_on_error,
):
"""CLI for multi-package manager upgrades."""
# Print log level.
level = logger.level
level_name = logging._levelToName.get(level, level)
logger.debug(f"Verbosity set to {level_name}.")
# Target all available managers by default.
target_ids = set(pool())
# Only keeps the subset selected by the user.
if manager:
target_ids = target_ids.intersection(manager)
# Remove managers excluded by the user.
target_ids = target_ids.difference(exclude)
target_managers = [pool()[mid] for mid in sorted(target_ids)]
# Apply manager-level options.
for m_obj in target_managers:
# Does the manager should raise on error or not.
m_obj.stop_on_error = stop_on_error
# Should we include auto-update packages or not?
m_obj.ignore_auto_updates = ignore_auto_updates
# Pre-filters inactive managers.
def keep_available(manager):
if manager.available:
return True
logger.warning(f"Skip unavailable {manager.id} manager.")
# Use an iterator to not trigger log messages for the 'managers' subcommand
# which is not using this variable.
active_managers = filter(keep_available, target_managers)
# Silence all log message for JSON rendering unless in debug mode.
if output_format == "json" and level_name != "DEBUG":
logger.setLevel(logging.CRITICAL * 2)
# Setup the table formatter.
if output_format != "json":
# Fallback unicode-rendering to safe ascii on Windows.
if CURRENT_OS_ID == WINDOWS and output_format in WINDOWS_MODE_BLACKLIST:
output_format = "ascii"
table_formatter.format_name = output_format
# Load up global options to the context.
ctx.obj = {
"target_managers": target_managers,
"active_managers": active_managers,
"output_format": output_format,
"sort_by": sort_by,
"stats": stats,
"time": time,
}
@ctx.call_on_close
def reset_logger():
"""Forces the logger level to reset at the end of each CLI execution, as it
might pollute the logger state between multiple test calls.
"""
logger.setLevel(logging.NOTSET)
@cli.command(short_help="List supported package managers and their location.")
@click.pass_context
@timeit()
def managers(ctx):
"""List all supported package managers and their presence on the system."""
target_managers = ctx.obj["target_managers"]
output_format = ctx.obj["output_format"]
sort_by = ctx.obj["sort_by"]
# Machine-friendly data rendering.
if output_format == "json":
manager_data = {}
# Build up the data structure of manager metadata.
fields = [
"name",
"id",
"supported",
"cli_path",
"executable",
"version",
"fresh",
"available",
]
for manager in target_managers:
manager_data[manager.id] = {fid: getattr(manager, fid) for fid in fields}
# Serialize errors at the last minute to gather all we encountered.
manager_data[manager.id]["errors"] = list(
{expt.error for expt in manager.cli_errors}
)
click.echo(json(manager_data))
return
# Human-friendly content rendering.
table = []
for manager in target_managers:
# Build up the OS column content.
os_infos = OK if manager.supported else KO
if not manager.supported:
os_infos += " {} only".format(
", ".join(sorted([os_label(os_id) for os_id in manager.platforms]))
)
# Build up the CLI path column content.
cli_infos = "{} {}".format(
OK if manager.cli_path else KO,
manager.cli_path
if manager.cli_path
else f"no {', '.join(manager.cli_names)} found",
)
# Build up the version column content.
version_infos = ""
if manager.executable:
version_infos = OK if manager.fresh else KO
if manager.version:
version_infos += f" {manager.version}"
if not manager.fresh:
version_infos += f" {manager.requirement}"
table.append(
[
manager.name,
click.style(manager.id, fg="green" if manager.fresh else "red"),
os_infos,
cli_infos,
OK if manager.executable else "",
version_infos,
]
)
print_table(
[
("Package manager", "manager_name"),
("ID", "manager_id"),
("Supported", None),
("CLI", None),
("Executable", None),
("Version", "version"),
],
table,
sort_by,
)
@cli.command(short_help="Sync local package info.")
@click.pass_context
@timeit()
def sync(ctx):
"""Sync local package metadata and info from external sources."""
active_managers = ctx.obj["active_managers"]
for manager in active_managers:
manager.sync()
@cli.command(short_help="Cleanup local data.")
@click.pass_context
@timeit()
def cleanup(ctx):
"""Cleanup local data and temporary artifacts."""
active_managers = ctx.obj["active_managers"]
for manager in active_managers:
manager.cleanup()
@cli.command(short_help="List installed packages.")
@click.pass_context
@timeit()
def installed(ctx):
"""List all packages installed on the system from all managers."""
active_managers = ctx.obj["active_managers"]
output_format = ctx.obj["output_format"]
sort_by = ctx.obj["sort_by"]
stats = ctx.obj["stats"]
# Build-up a global dict of installed packages per manager.
installed_data = {}
for manager in active_managers:
installed_data[manager.id] = {
"id": manager.id,
"name": manager.name,
"packages": list(manager.installed.values()),
}
# Serialize errors at the last minute to gather all we encountered.
installed_data[manager.id]["errors"] = list(
{expt.error for expt in manager.cli_errors}
)
# Machine-friendly data rendering.
if output_format == "json":
click.echo(json(installed_data))
return
# Human-friendly content rendering.
table = []
for manager_id, installed_pkg in installed_data.items():
table += [
[
info["name"],
info["id"],
manager_id,
info["installed_version"] if info["installed_version"] else "?",
]
for info in installed_pkg["packages"]
]
# Sort and print table.
print_table(
[
("Package name", "package_name"),
("ID", "package_id"),
("Manager", "manager_id"),
("Installed version", "version"),
],
table,
sort_by,
)
if stats:
print_stats(installed_data)
@cli.command(short_help="Search packages.")
@click.option(
"--extended/--package-name",
default=False,
help="Extend search to additional package metadata like description, "
"instead of restricting it package ID and name.",
)
@click.option(
"--exact/--fuzzy",
default=False,
help="Only returns exact matches, or enable fuzzy search in substrings.",
)
@click.argument("query", type=click.STRING, required=True)
@click.pass_context
@timeit()
def search(ctx, extended, exact, query):
"""Search packages from all managers."""
active_managers = ctx.obj["active_managers"]
output_format = ctx.obj["output_format"]
sort_by = ctx.obj["sort_by"]
stats = ctx.obj["stats"]
# Build-up a global list of package matches per manager.
matches = {}
for manager in active_managers:
matches[manager.id] = {
"id": manager.id,
"name": manager.name,
"packages": list(manager.search(query, extended, exact).values()),
}
# Serialize errors at the last minute to gather all we encountered.
matches[manager.id]["errors"] = list(
{expt.error for expt in manager.cli_errors}
)
# Machine-friendly data rendering.
if output_format == "json":
click.echo(json(matches))
return
# Prepare highlighting helpers.
query_parts = {query}.union(map(str, TokenizedString(query)))
@cached(LRI(max_size=1000))
def highlight(string):
# Ranges of character indices flagged for highlighting.
ranges = set()
# TODO: Fix upper-case matching, as tokenizer lower them down.
for part in query_parts:
# Search for occurrences of query parts in original string.
if part in string:
# Flag matching substrings for highlighting.
occurrences = [match.start() for match in re.finditer(part, string)]
for match_start in occurrences:
match_end = match_start + len(part) - 1
ranges.add(f"{match_start}-{match_end}")
# Reduce index ranges, compute complement ranges, transform them to
# list of integers.
ranges = ",".join(ranges)
bold_ranges = int_ranges_from_int_list(ranges)
normal_ranges = int_ranges_from_int_list(
complement_int_list(ranges, range_end=len(string))
)
# Apply style to range of characters flagged as matching.
styled_str = ""
for i, j in sorted(bold_ranges + normal_ranges):
segment = getitem(string, slice(i, j + 1))
if (i, j) in bold_ranges:
segment = click.style(segment, bold=True)
styled_str += segment
return styled_str
# Human-friendly content rendering.
table = []
for manager_id, matching_pkg in matches.items():
table += [
[
highlight(info["name"]),
highlight(info["id"]),
manager_id,
info["latest_version"] if info["latest_version"] else "?",
]
for info in matching_pkg["packages"]
]
# Sort and print table.
print_table(
[
("Package name", "package_name"),
("ID", "package_id"),
("Manager", "manager_id"),
("Latest version", "version"),
],
table,
sort_by,
)
if stats:
print_stats(matches)
@cli.command(short_help="List outdated packages.")
@click.option(
"-c",
"--cli-format",
type=click.Choice(CLI_FORMATS, case_sensitive=False),
default="plain",
help="Format of CLI fields in JSON output.",
)
@click.pass_context
@timeit()
def outdated(ctx, cli_format):
"""List available package upgrades and their versions for each manager."""
active_managers = ctx.obj["active_managers"]
output_format = ctx.obj["output_format"]
sort_by = ctx.obj["sort_by"]
stats = ctx.obj["stats"]
render_cli = partial(PackageManager.render_cli, cli_format=cli_format)
# Build-up a global list of outdated packages per manager.
outdated_data = {}
for manager in active_managers:
packages = list(map(dict, manager.outdated.values()))
for info in packages:
info.update({"upgrade_cli": render_cli(manager.upgrade_cli(info["id"]))})
outdated_data[manager.id] = {
"id": manager.id,
"name": manager.name,
"packages": packages,
}
# Do not include the full-upgrade CLI if we did not detect any outdated
# package.
if manager.outdated:
try:
upgrade_all_cli = manager.upgrade_all_cli()
except NotImplementedError:
# Fallback on mpm itself which is capable of simulating a full
# upgrade.
upgrade_all_cli = [CLI_NAME, "--manager", manager.id, "upgrade"]
outdated_data[manager.id]["upgrade_all_cli"] = render_cli(upgrade_all_cli)
# Serialize errors at the last minute to gather all we encountered.
outdated_data[manager.id]["errors"] = list(
{expt.error for expt in manager.cli_errors}
)
# Machine-friendly data rendering.
if output_format == "json":
click.echo(json(outdated_data))
return
# Human-friendly content rendering.
table = []
for manager_id, outdated_pkg in outdated_data.items():
table += [
[
info["name"],
info["id"],
manager_id,
info["installed_version"] if info["installed_version"] else "?",
info["latest_version"],
]
for info in outdated_pkg["packages"]
]
# Sort and print table.
print_table(
[
("Package name", "package_name"),
("ID", "package_id"),
("Manager", "manager_id"),
("Installed version", "version"),
("Latest version", None),
],
table,
sort_by,
)
if stats:
print_stats(outdated_data)
@cli.command(short_help="Upgrade all packages.")
@click.option(
"-d",
"--dry-run",
is_flag=True,
default=False,
help="Do not actually perform any upgrade, just simulate CLI calls.",
)
@click.pass_context
@timeit()
def upgrade(ctx, dry_run):
"""Perform a full package upgrade on all available managers."""
active_managers = ctx.obj["active_managers"]
for manager in active_managers:
logger.info(f"Updating all outdated packages from {manager.id}...")
try:
output = manager.upgrade_all(dry_run=dry_run)
except CLIError as expt:
logger.error(expt.error)
if output:
logger.info(output)
@cli.command(short_help="Save installed packages to a TOML file.")
@click.argument("toml_output", type=click.File("w"), default="-")
@click.pass_context
@timeit()
def backup(ctx, toml_output):
"""Dump the list of installed packages to a TOML file.
By default the generated TOML content is displayed directly in the console
output. So `mpm backup` is the same as a call to `mpm backup -`. To have
the result written in a file on disk, specify the output file like so:
`mpm backup ./mpm-packages.toml`.
The TOML file can then be safely consumed by the `mpm restore` command.
"""
active_managers = ctx.obj["active_managers"]
stats = ctx.obj["stats"]
# XXX Hack for unittests to pass, while we wait for
# https://github.com/pallets/click/pull/1497
if isinstance(toml_output, TextIOWrapper):
toml_output = __stdout__
is_stdout = toml_output is __stdout__
toml_filepath = toml_output.name if is_stdout else Path(toml_output.name).resolve()
logger.info(f"Backup package list to {toml_filepath}")
if not is_stdout:
if toml_filepath.exists() and not toml_filepath.is_file():
logger.error("Target file exist and is not a file.")
return
if toml_filepath.suffix.lower() != ".toml":
logger.error("Target file is not a TOML file.")
return
# Leave some metadata as comment.
doc = f"# Generated by {CLI_NAME} {__version__}.\n"
doc += "# Timestamp: {}.\n".format(datetime.now().isoformat())
installed_data = {}
# Create one section for each package manager.
for manager in active_managers:
logger.info(f"Dumping packages from {manager.id}...")
# Prepare data for stats.
installed_data[manager.id] = {
"id": manager.id,
"packages": manager.installed.values(),
}
pkg_data = sorted(
[(p["id"], p["installed_version"]) for p in manager.installed.values()]
)
if pkg_data:
doc += "\n" + tomli_w.dumps(
{
manager.id: {
package_id: f"^{package_version}"
for package_id, package_version in pkg_data
}
}
)
toml_output.write(doc)
if stats:
print_stats(installed_data)
@cli.command(short_help="Install packages in batch as specified by TOML files.")
@click.argument("toml_files", type=click.File("r"), required=True, nargs=-1)
@click.pass_context
@timeit()
def restore(ctx, toml_files):
"""Read TOML files then install or upgrade each package referenced in
them.
"""
active_managers = ctx.obj["active_managers"]
for toml_input in toml_files:
toml_filepath = (
toml_input.name
if toml_input is __stdin__
else Path(toml_input.name).resolve()
)
logger.info(f"Load package list from {toml_filepath}")
doc = tomli.loads(toml_input.read())
# List unrecognized sections.
ignored_sections = [f"[{section}]" for section in doc if section not in pool()]
if ignored_sections:
plural = "s" if len(ignored_sections) > 1 else ""
sections = ", ".join(ignored_sections)
logger.warning(f"Ignore {sections} section{plural}.")
for manager in active_managers:
if manager.id not in doc:
logger.warning(f"No [{manager.id}] section found.")
continue
logger.info(f"Restore {manager.id} packages...")
logger.warning("Installation of packages not implemented yet.")
# for package_id, version in doc[manager.id].items():
# raise NotImplemented