/
cmd2.py
4288 lines (3511 loc) · 197 KB
/
cmd2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# coding=utf-8
"""Variant on standard library's cmd with extra features.
To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you
were using the standard library's cmd, while enjoying the extra features.
Searchable command history (commands: "history")
Run commands from file, save to file, edit commands in file
Multi-line commands
Special-character shortcut commands (beyond cmd's "?" and "!")
Settable environment parameters
Parsing commands with `argparse` argument parsers (flags)
Redirection to file or paste buffer (clipboard) with > or >>
Easy transcript-based testing of applications (see examples/example.py)
Bash-style ``select`` available
Note that redirection with > and | will only work if `self.poutput()`
is used in place of `print`.
- Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com
Git repository on GitHub at https://github.com/python-cmd2/cmd2
"""
# This module has many imports, quite a few of which are only
# infrequently utilized. To reduce the initial overhead of
# import this module, many of these imports are lazy-loaded
# i.e. we only import the module when we use it
# For example, we don't import the 'traceback' module
# until the pexcept() function is called and the debug
# setting is True
import argparse
import cmd
import glob
import inspect
import os
import pickle
import re
import sys
import threading
from code import InteractiveConsole
from collections import namedtuple
from contextlib import redirect_stdout
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Type, Union
from . import ansi, constants, plugin, utils
from .argparse_custom import DEFAULT_ARGUMENT_PARSER, CompletionItem
from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
from .decorators import with_argparser
from .exceptions import Cmd2ArgparseError, Cmd2ShlexError, EmbeddedConsoleExit, EmptyStatement, RedirectionError
from .history import History, HistoryItem
from .parsing import Macro, MacroArg, Statement, StatementParser, shlex_split
from .rl_utils import RlType, rl_get_point, rl_make_safe_prompt, rl_set_prompt, rl_type, rl_warning, vt100_support
from .utils import CompletionError, Settable
# Set up readline
if rl_type == RlType.NONE: # pragma: no cover
sys.stderr.write(ansi.style_warning(rl_warning))
else:
from .rl_utils import rl_force_redisplay, readline
# Used by rlcompleter in Python console loaded by py command
orig_rl_delims = readline.get_completer_delims()
if rl_type == RlType.PYREADLINE:
# Save the original pyreadline display completion function since we need to override it and restore it
# noinspection PyProtectedMember,PyUnresolvedReferences
orig_pyreadline_display = readline.rl.mode._display_completions
elif rl_type == RlType.GNU:
# Get the readline lib so we can make changes to it
import ctypes
from .rl_utils import readline_lib
rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters")
orig_rl_basic_quotes = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value
# Detect whether IPython is installed to determine if the built-in "ipy" command should be included
ipython_available = True
try:
# noinspection PyUnresolvedReferences,PyPackageRequirements
from IPython import embed
except ImportError: # pragma: no cover
ipython_available = False
class _SavedReadlineSettings:
"""readline settings that are backed up when switching between readline environments"""
def __init__(self):
self.completer = None
self.delims = ''
self.basic_quotes = None
class _SavedCmd2Env:
"""cmd2 environment settings that are backed up when entering an interactive Python shell"""
def __init__(self):
self.readline_settings = _SavedReadlineSettings()
self.readline_module = None
self.history = []
self.sys_stdout = None
self.sys_stdin = None
# Contains data about a disabled command which is used to restore its original functions when the command is enabled
DisabledCommand = namedtuple('DisabledCommand', ['command_function', 'help_function', 'completer_function'])
class Cmd(cmd.Cmd):
"""An easy but powerful framework for writing line-oriented command interpreters.
Extends the Python Standard Library’s cmd package by adding a lot of useful features
to the out of the box configuration.
Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes.
"""
DEFAULT_EDITOR = utils.find_editor()
INTERNAL_COMMAND_EPILOG = ("Notes:\n"
" This command is for internal use and is not intended to be called from the\n"
" command line.")
# Sorting keys for strings
ALPHABETICAL_SORT_KEY = utils.norm_fold
NATURAL_SORT_KEY = utils.natural_keys
def __init__(self, completekey: str = 'tab', stdin=None, stdout=None, *,
persistent_history_file: str = '', persistent_history_length: int = 1000,
startup_script: str = '', use_ipython: bool = False,
allow_cli_args: bool = True, transcript_files: Optional[List[str]] = None,
allow_redirection: bool = True, multiline_commands: Optional[List[str]] = None,
terminators: Optional[List[str]] = None, shortcuts: Optional[Dict[str, str]] = None) -> None:
"""An easy but powerful framework for writing line-oriented command
interpreters. Extends Python's cmd package.
:param completekey: readline name of a completion key, default to Tab
:param stdin: alternate input file object, if not specified, sys.stdin is used
:param stdout: alternate output file object, if not specified, sys.stdout is used
:param persistent_history_file: file path to load a persistent cmd2 command history from
:param persistent_history_length: max number of history items to write
to the persistent history file
:param startup_script: file path to a script to execute at startup
:param use_ipython: should the "ipy" command be included for an embedded IPython shell
:param allow_cli_args: if ``True``, then :meth:`cmd2.Cmd.__init__` will process command
line arguments as either commands to be run or, if ``-t`` or
``--test`` are given, transcript files to run. This should be
set to ``False`` if your application parses its own command line
arguments.
:param transcript_files: pass a list of transcript files to be run on initialization.
This allows running transcript tests when ``allow_cli_args``
is ``False``. If ``allow_cli_args`` is ``True`` this parameter
is ignored.
:param allow_redirection: If ``False``, prevent output redirection and piping to shell
commands. This parameter prevents redirection and piping, but
does not alter parsing behavior. A user can still type
redirection and piping tokens, and they will be parsed as such
but they won't do anything.
:param multiline_commands: list of commands allowed to accept multi-line input
:param terminators: list of characters that terminate a command. These are mainly
intended for terminating multiline commands, but will also
terminate single-line commands. If not supplied, the default
is a semicolon. If your app only contains single-line commands
and you want terminators to be treated as literals by the parser,
then set this to an empty list.
:param shortcuts: dictionary containing shortcuts for commands. If not supplied,
then defaults to constants.DEFAULT_SHORTCUTS. If you do not want
any shortcuts, pass an empty dictionary.
"""
# If use_ipython is False, make sure the ipy command isn't available in this instance
if not use_ipython:
try:
self.do_ipy = None
except AttributeError:
pass
# initialize plugin system
# needs to be done before we call __init__(0)
self._initialize_plugin_system()
# Call super class constructor
super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
# Attributes which should NOT be dynamically settable via the set command at runtime
self.default_to_shell = False # Attempt to run unrecognized commands as shell commands
self.quit_on_sigint = False # Ctrl-C at the prompt will quit the program instead of just resetting prompt
self.allow_redirection = allow_redirection # Security setting to prevent redirection of stdout
# Attributes which ARE dynamically settable via the set command at runtime
self.debug = False
self.echo = False
self.editor = Cmd.DEFAULT_EDITOR
self.feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing)
self.quiet = False # Do not suppress nonessential output
self.timing = False # Prints elapsed time for each command
# The maximum number of CompletionItems to display during tab completion. If the number of completion
# suggestions exceeds this number, they will be displayed in the typical columnized format and will
# not include the description value of the CompletionItems.
self.max_completion_items = 50
# A dictionary mapping settable names to their Settable instance
self.settables = dict()
self.build_settables()
# Use as prompt for multiline commands on the 2nd+ line of input
self.continuation_prompt = '> '
# Allow access to your application in embedded Python shells and scripts py via self
self.self_in_py = False
# Commands to exclude from the help menu and tab completion
self.hidden_commands = ['eof', '_relative_run_script']
# Initialize history
self._persistent_history_length = persistent_history_length
self._initialize_history(persistent_history_file)
# Commands to exclude from the history command
self.exclude_from_history = ['eof', 'history']
# Dictionary of macro names and their values
self.macros = dict()
# Keeps track of typed command history in the Python shell
self._py_history = []
# The name by which Python environments refer to the PyBridge to call app commands
self.py_bridge_name = 'app'
# Defines app-specific variables/functions available in Python shells and pyscripts
self.py_locals = dict()
# True if running inside a Python script or interactive console, False otherwise
self._in_py = False
self.statement_parser = StatementParser(terminators=terminators,
multiline_commands=multiline_commands,
shortcuts=shortcuts)
# Verify commands don't have invalid names (like starting with a shortcut)
for cur_cmd in self.get_all_commands():
valid, errmsg = self.statement_parser.is_valid_command(cur_cmd)
if not valid:
raise ValueError("Invalid command name {!r}: {}".format(cur_cmd, errmsg))
# Stores results from the last command run to enable usage of results in a Python script or interactive console
# Built-in commands don't make use of this. It is purely there for user-defined commands and convenience.
self.last_result = None
# Used by run_script command to store current script dir as a LIFO queue to support _relative_run_script command
self._script_dir = []
# Context manager used to protect critical sections in the main thread from stopping due to a KeyboardInterrupt
self.sigint_protection = utils.ContextFlag()
# If the current command created a process to pipe to, then this will be a ProcReader object.
# Otherwise it will be None. It's used to know when a pipe process can be killed and/or waited upon.
self._cur_pipe_proc_reader = None
# Used to keep track of whether we are redirecting or piping output
self._redirecting = False
# Used to keep track of whether a continuation prompt is being displayed
self._at_continuation_prompt = False
# The multiline command currently being typed which is used to tab complete multiline commands.
self._multiline_in_progress = ''
# Set the header used for the help function's listing of documented functions
self.doc_header = "Documented commands (use 'help -v' for verbose/'help <topic>' for details):"
# The error that prints when no help information can be found
self.help_error = "No help on {}"
# The error that prints when a non-existent command is run
self.default_error = "{} is not a recognized command, alias, or macro"
# If non-empty, this string will be displayed if a broken pipe error occurs
self.broken_pipe_warning = ''
# Commands that will run at the beginning of the command loop
self._startup_commands = []
# If a startup script is provided and exists, then execute it in the startup commands
if startup_script:
startup_script = os.path.abspath(os.path.expanduser(startup_script))
if os.path.exists(startup_script):
self._startup_commands.append("run_script {}".format(utils.quote_string(startup_script)))
# Transcript files to run instead of interactive command loop
self._transcript_files = None
# Check for command line args
if allow_cli_args:
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--test', action="store_true",
help='Test against transcript(s) in FILE (wildcards OK)')
callopts, callargs = parser.parse_known_args()
# If transcript testing was called for, use other arguments as transcript files
if callopts.test:
self._transcript_files = callargs
# If commands were supplied at invocation, then add them to the command queue
elif callargs:
self._startup_commands.extend(callargs)
elif transcript_files:
self._transcript_files = transcript_files
# Set the pager(s) for use with the ppaged() method for displaying output using a pager
if sys.platform.startswith('win'):
self.pager = self.pager_chop = 'more'
else:
# Here is the meaning of the various flags we are using with the less command:
# -S causes lines longer than the screen width to be chopped (truncated) rather than wrapped
# -R causes ANSI "style" escape sequences to be output in raw form (i.e. colors are displayed)
# -X disables sending the termcap initialization and deinitialization strings to the terminal
# -F causes less to automatically exit if the entire file can be displayed on the first screen
self.pager = 'less -RXF'
self.pager_chop = 'less -SRXF'
# This boolean flag determines whether or not the cmd2 application can interact with the clipboard
self._can_clip = can_clip
# This determines the value returned by cmdloop() when exiting the application
self.exit_code = 0
# This lock should be acquired before doing any asynchronous changes to the terminal to
# ensure the updates to the terminal don't interfere with the input being typed or output
# being printed by a command.
self.terminal_lock = threading.RLock()
# Commands that have been disabled from use. This is to support commands that are only available
# during specific states of the application. This dictionary's keys are the command names and its
# values are DisabledCommand objects.
self.disabled_commands = dict()
# If any command has been categorized, then all other commands that haven't been categorized
# will display under this section in the help output.
self.default_category = 'Uncategorized'
# The default key for sorting string results. Its default value performs a case-insensitive alphabetical sort.
# If natural sorting is preferred, then set this to NATURAL_SORT_KEY.
# cmd2 uses this key for sorting:
# command and category names
# alias, macro, settable, and shortcut names
# tab completion results when self.matches_sorted is False
self.default_sort_key = Cmd.ALPHABETICAL_SORT_KEY
############################################################################################################
# The following variables are used by tab completion functions. They are reset each time complete() is run
# in _reset_completion_defaults() and it is up to completer functions to set them before returning results.
############################################################################################################
# If True and a single match is returned to complete(), then a space will be appended
# if the match appears at the end of the line
self.allow_appended_space = True
# If True and a single match is returned to complete(), then a closing quote
# will be added if there is an unmatched opening quote
self.allow_closing_quote = True
# An optional header that prints above the tab completion suggestions
self.completion_header = ''
# Used by complete() for readline tab completion
self.completion_matches = []
# Use this list if you are completing strings that contain a common delimiter and you only want to
# display the final portion of the matches as the tab completion suggestions. The full matches
# still must be returned from your completer function. For an example, look at path_complete()
# which uses this to show only the basename of paths as the suggestions. delimiter_complete() also
# populates this list.
self.display_matches = []
# Used by functions like path_complete() and delimiter_complete() to properly
# quote matches that are completed in a delimited fashion
self.matches_delimited = False
# Set to True before returning matches to complete() in cases where matches have already been sorted.
# If False, then complete() will sort the matches using self.default_sort_key before they are displayed.
self.matches_sorted = False
def add_settable(self, settable: Settable) -> None:
"""
Convenience method to add a settable parameter to ``self.settables``
:param settable: Settable object being added
"""
self.settables[settable.name] = settable
def remove_settable(self, name: str) -> None:
"""
Convenience method for removing a settable parameter from ``self.settables``
:param name: name of the settable being removed
:raises: KeyError if the Settable matches this name
"""
try:
del self.settables[name]
except KeyError:
raise KeyError(name + " is not a settable parameter")
def build_settables(self):
"""Create the dictionary of user-settable parameters"""
self.add_settable(Settable('allow_style', str,
'Allow ANSI text style sequences in output (valid values: '
'{}, {}, {})'.format(ansi.STYLE_TERMINAL,
ansi.STYLE_ALWAYS,
ansi.STYLE_NEVER),
choices=[ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, ansi.STYLE_NEVER]))
self.add_settable(Settable('debug', bool, "Show full traceback on exception"))
self.add_settable(Settable('echo', bool, "Echo command issued into output"))
self.add_settable(Settable('editor', str, "Program used by 'edit'"))
self.add_settable(Settable('feedback_to_output', bool, "Include nonessentials in '|', '>' results"))
self.add_settable(Settable('max_completion_items', int,
"Maximum number of CompletionItems to display during tab completion"))
self.add_settable(Settable('quiet', bool, "Don't print nonessential feedback"))
self.add_settable(Settable('timing', bool, "Report execution times"))
# ----- Methods related to presenting output to the user -----
@property
def allow_style(self) -> str:
"""Read-only property needed to support do_set when it reads allow_style"""
return ansi.allow_style
@allow_style.setter
def allow_style(self, new_val: str) -> None:
"""Setter property needed to support do_set when it updates allow_style"""
new_val = new_val.capitalize()
if new_val in [ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, ansi.STYLE_NEVER]:
ansi.allow_style = new_val
else:
raise ValueError("must be {}, {}, or {} (case-insensitive)".format(ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS,
ansi.STYLE_NEVER))
def _completion_supported(self) -> bool:
"""Return whether tab completion is supported"""
return self.use_rawinput and self.completekey and rl_type != RlType.NONE
@property
def visible_prompt(self) -> str:
"""Read-only property to get the visible prompt with any ANSI style escape codes stripped.
Used by transcript testing to make it easier and more reliable when users are doing things like coloring the
prompt using ANSI color codes.
:return: prompt stripped of any ANSI escape codes
"""
return ansi.strip_style(self.prompt)
def poutput(self, msg: Any = '', *, end: str = '\n') -> None:
"""Print message to self.stdout and appends a newline by default
Also handles BrokenPipeError exceptions for when a commands's output has
been piped to another process and that process terminates before the
cmd2 command is finished executing.
:param msg: message to print (anything convertible to a str with '{}'.format() is OK)
:param end: string appended after the end of the message, default a newline
"""
try:
ansi.style_aware_write(self.stdout, "{}{}".format(msg, end))
except BrokenPipeError:
# This occurs if a command's output is being piped to another
# process and that process closes before the command is
# finished. If you would like your application to print a
# warning message, then set the broken_pipe_warning attribute
# to the message you want printed.
if self.broken_pipe_warning:
sys.stderr.write(self.broken_pipe_warning)
# noinspection PyMethodMayBeStatic
def perror(self, msg: Any = '', *, end: str = '\n', apply_style: bool = True) -> None:
"""Print message to sys.stderr
:param msg: message to print (anything convertible to a str with '{}'.format() is OK)
:param end: string appended after the end of the message, default a newline
:param apply_style: If True, then ansi.style_error will be applied to the message text. Set to False in cases
where the message text already has the desired style. Defaults to True.
"""
if apply_style:
final_msg = ansi.style_error(msg)
else:
final_msg = "{}".format(msg)
ansi.style_aware_write(sys.stderr, final_msg + end)
def pwarning(self, msg: Any = '', *, end: str = '\n', apply_style: bool = True) -> None:
"""Wraps perror, but applies ansi.style_warning by default
:param msg: message to print (anything convertible to a str with '{}'.format() is OK)
:param end: string appended after the end of the message, default a newline
:param apply_style: If True, then ansi.style_warning will be applied to the message text. Set to False in cases
where the message text already has the desired style. Defaults to True.
"""
if apply_style:
msg = ansi.style_warning(msg)
self.perror(msg, end=end, apply_style=False)
def pexcept(self, msg: Any, *, end: str = '\n', apply_style: bool = True) -> None:
"""Print Exception message to sys.stderr. If debug is true, print exception traceback if one exists.
:param msg: message or Exception to print
:param end: string appended after the end of the message, default a newline
:param apply_style: If True, then ansi.style_error will be applied to the message text. Set to False in cases
where the message text already has the desired style. Defaults to True.
"""
if self.debug and sys.exc_info() != (None, None, None):
import traceback
traceback.print_exc()
if isinstance(msg, Exception):
final_msg = "EXCEPTION of type '{}' occurred with message: '{}'".format(type(msg).__name__, msg)
else:
final_msg = "{}".format(msg)
if apply_style:
final_msg = ansi.style_error(final_msg)
if not self.debug and 'debug' in self.settables:
warning = "\nTo enable full traceback, run the following command: 'set debug true'"
final_msg += ansi.style_warning(warning)
self.perror(final_msg, end=end, apply_style=False)
def pfeedback(self, msg: Any, *, end: str = '\n') -> None:
"""For printing nonessential feedback. Can be silenced with `quiet`.
Inclusion in redirected output is controlled by `feedback_to_output`.
:param msg: message to print (anything convertible to a str with '{}'.format() is OK)
:param end: string appended after the end of the message, default a newline
"""
if not self.quiet:
if self.feedback_to_output:
self.poutput(msg, end=end)
else:
self.perror(msg, end=end, apply_style=False)
def ppaged(self, msg: Any, *, end: str = '\n', chop: bool = False) -> None:
"""Print output using a pager if it would go off screen and stdout isn't currently being redirected.
Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when
stdout or stdin are not a fully functional terminal.
:param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK)
:param end: string appended after the end of the message, default a newline
:param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped
- truncated text is still accessible by scrolling with the right & left arrow keys
- chopping is ideal for displaying wide tabular data as is done in utilities like pgcli
False -> causes lines longer than the screen width to wrap to the next line
- wrapping is ideal when you want to keep users from having to use horizontal scrolling
WARNING: On Windows, the text always wraps regardless of what the chop argument is set to
"""
# msg can be any type, so convert to string before checking if it's blank
msg_str = str(msg)
# Consider None to be no data to print
if msg is None or msg_str == '':
return
try:
import subprocess
# Attempt to detect if we are not running within a fully functional terminal.
# Don't try to use the pager when being run by a continuous integration system like Jenkins + pexpect.
functional_terminal = False
if self.stdin.isatty() and self.stdout.isatty():
if sys.platform.startswith('win') or os.environ.get('TERM') is not None:
functional_terminal = True
# Don't attempt to use a pager that can block if redirecting or running a script (either text or Python)
# Also only attempt to use a pager if actually running in a real fully functional terminal
if functional_terminal and not self._redirecting and not self.in_pyscript() and not self.in_script():
if ansi.allow_style.lower() == ansi.STYLE_NEVER.lower():
msg_str = ansi.strip_style(msg_str)
msg_str += end
pager = self.pager
if chop:
pager = self.pager_chop
# Prevent KeyboardInterrupts while in the pager. The pager application will
# still receive the SIGINT since it is in the same process group as us.
with self.sigint_protection:
pipe_proc = subprocess.Popen(pager, shell=True, stdin=subprocess.PIPE)
pipe_proc.communicate(msg_str.encode('utf-8', 'replace'))
else:
self.poutput(msg_str, end=end)
except BrokenPipeError:
# This occurs if a command's output is being piped to another process and that process closes before the
# command is finished. If you would like your application to print a warning message, then set the
# broken_pipe_warning attribute to the message you want printed.`
if self.broken_pipe_warning:
sys.stderr.write(self.broken_pipe_warning)
# ----- Methods related to tab completion -----
def _reset_completion_defaults(self) -> None:
"""
Resets tab completion settings
Needs to be called each time readline runs tab completion
"""
self.allow_appended_space = True
self.allow_closing_quote = True
self.completion_header = ''
self.completion_matches = []
self.display_matches = []
self.matches_delimited = False
self.matches_sorted = False
if rl_type == RlType.GNU:
readline.set_completion_display_matches_hook(self._display_matches_gnu_readline)
elif rl_type == RlType.PYREADLINE:
# noinspection PyUnresolvedReferences
readline.rl.mode._display_completions = self._display_matches_pyreadline
def tokens_for_completion(self, line: str, begidx: int, endidx: int) -> Tuple[List[str], List[str]]:
"""Used by tab completion functions to get all tokens through the one being completed.
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
:return: A 2 item tuple where the items are
**On Success**
- tokens: list of unquoted tokens - this is generally the list needed for tab completion functions
- raw_tokens: list of tokens with any quotes preserved = this can be used to know if a token was quoted
or is missing a closing quote
Both lists are guaranteed to have at least 1 item. The last item in both lists is the token being tab
completed
**On Failure**
- Two empty lists
"""
import copy
unclosed_quote = ''
quotes_to_try = copy.copy(constants.QUOTES)
tmp_line = line[:endidx]
tmp_endidx = endidx
# Parse the line into tokens
while True:
try:
initial_tokens = shlex_split(tmp_line[:tmp_endidx])
# If the cursor is at an empty token outside of a quoted string,
# then that is the token being completed. Add it to the list.
if not unclosed_quote and begidx == tmp_endidx:
initial_tokens.append('')
break
except ValueError as ex:
# Make sure the exception was due to an unclosed quote and
# we haven't exhausted the closing quotes to try
if str(ex) == "No closing quotation" and quotes_to_try:
# Add a closing quote and try to parse again
unclosed_quote = quotes_to_try[0]
quotes_to_try = quotes_to_try[1:]
tmp_line = line[:endidx]
tmp_line += unclosed_quote
tmp_endidx = endidx + 1
else:
# The parsing error is not caused by unclosed quotes.
# Return empty lists since this means the line is malformed.
return [], []
# Further split tokens on punctuation characters
raw_tokens = self.statement_parser.split_on_punctuation(initial_tokens)
# Save the unquoted tokens
tokens = [utils.strip_quotes(cur_token) for cur_token in raw_tokens]
# If the token being completed had an unclosed quote, we need
# to remove the closing quote that was added in order for it
# to match what was on the command line.
if unclosed_quote:
raw_tokens[-1] = raw_tokens[-1][:-1]
return tokens, raw_tokens
def delimiter_complete(self, text: str, line: str, begidx: int, endidx: int,
match_against: Iterable, delimiter: str) -> List[str]:
"""
Performs tab completion against a list but each match is split on a delimiter and only
the portion of the match being tab completed is shown as the completion suggestions.
This is useful if you match against strings that are hierarchical in nature and have a
common delimiter.
An easy way to illustrate this concept is path completion since paths are just directories/files
delimited by a slash. If you are tab completing items in /home/user you don't get the following
as suggestions:
/home/user/file.txt /home/user/program.c
/home/user/maps/ /home/user/cmd2.py
Instead you are shown:
file.txt program.c
maps/ cmd2.py
For a large set of data, this can be visually more pleasing and easier to search.
Another example would be strings formatted with the following syntax: company::department::name
In this case the delimiter would be :: and the user could easily narrow down what they are looking
for if they were only shown suggestions in the category they are at in the string.
:param text: the string prefix we are attempting to match (all matches must begin with it)
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
:param match_against: the list being matched against
:param delimiter: what delimits each portion of the matches (ex: paths are delimited by a slash)
:return: a list of possible tab completions
"""
matches = utils.basic_complete(text, line, begidx, endidx, match_against)
# Display only the portion of the match that's being completed based on delimiter
if matches:
# Set this to True for proper quoting of matches with spaces
self.matches_delimited = True
# Get the common beginning for the matches
common_prefix = os.path.commonprefix(matches)
prefix_tokens = common_prefix.split(delimiter)
# Calculate what portion of the match we are completing
display_token_index = 0
if prefix_tokens:
display_token_index = len(prefix_tokens) - 1
# Get this portion for each match and store them in self.display_matches
for cur_match in matches:
match_tokens = cur_match.split(delimiter)
display_token = match_tokens[display_token_index]
if not display_token:
display_token = delimiter
self.display_matches.append(display_token)
return matches
def flag_based_complete(self, text: str, line: str, begidx: int, endidx: int,
flag_dict: Dict[str, Union[Iterable, Callable]], *,
all_else: Union[None, Iterable, Callable] = None) -> List[str]:
"""Tab completes based on a particular flag preceding the token being completed.
:param text: the string prefix we are attempting to match (all matches must begin with it)
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
:param flag_dict: dictionary whose structure is the following:
`keys` - flags (ex: -c, --create) that result in tab completion for the next argument in the
command line
`values` - there are two types of values:
1. iterable list of strings to match against (dictionaries, lists, etc.)
2. function that performs tab completion (ex: path_complete)
:param all_else: an optional parameter for tab completing any token that isn't preceded by a flag in flag_dict
:return: a list of possible tab completions
"""
# Get all tokens through the one being completed
tokens, _ = self.tokens_for_completion(line, begidx, endidx)
if not tokens:
return []
completions_matches = []
match_against = all_else
# Must have at least 2 args for a flag to precede the token being completed
if len(tokens) > 1:
flag = tokens[-2]
if flag in flag_dict:
match_against = flag_dict[flag]
# Perform tab completion using an Iterable
if isinstance(match_against, Iterable):
completions_matches = utils.basic_complete(text, line, begidx, endidx, match_against)
# Perform tab completion using a function
elif callable(match_against):
completions_matches = match_against(text, line, begidx, endidx)
return completions_matches
def index_based_complete(self, text: str, line: str, begidx: int, endidx: int,
index_dict: Mapping[int, Union[Iterable, Callable]], *,
all_else: Union[None, Iterable, Callable] = None) -> List[str]:
"""Tab completes based on a fixed position in the input string.
:param text: the string prefix we are attempting to match (all matches must begin with it)
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
:param index_dict: dictionary whose structure is the following:
`keys` - 0-based token indexes into command line that determine which tokens perform tab
completion
`values` - there are two types of values:
1. iterable list of strings to match against (dictionaries, lists, etc.)
2. function that performs tab completion (ex: path_complete)
:param all_else: an optional parameter for tab completing any token that isn't at an index in index_dict
:return: a list of possible tab completions
"""
# Get all tokens through the one being completed
tokens, _ = self.tokens_for_completion(line, begidx, endidx)
if not tokens:
return []
matches = []
# Get the index of the token being completed
index = len(tokens) - 1
# Check if token is at an index in the dictionary
if index in index_dict:
match_against = index_dict[index]
else:
match_against = all_else
# Perform tab completion using a Iterable
if isinstance(match_against, Iterable):
matches = utils.basic_complete(text, line, begidx, endidx, match_against)
# Perform tab completion using a function
elif callable(match_against):
matches = match_against(text, line, begidx, endidx)
return matches
# noinspection PyUnusedLocal
def path_complete(self, text: str, line: str, begidx: int, endidx: int, *,
path_filter: Optional[Callable[[str], bool]] = None) -> List[str]:
"""Performs completion of local file system paths
:param text: the string prefix we are attempting to match (all matches must begin with it)
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
:param path_filter: optional filter function that determines if a path belongs in the results
this function takes a path as its argument and returns True if the path should
be kept in the results
:return: a list of possible tab completions
"""
# Used to complete ~ and ~user strings
def complete_users() -> List[str]:
# We are returning ~user strings that resolve to directories,
# so don't append a space or quote in the case of a single result.
self.allow_appended_space = False
self.allow_closing_quote = False
users = []
# Windows lacks the pwd module so we can't get a list of users.
# Instead we will return a result once the user enters text that
# resolves to an existing home directory.
if sys.platform.startswith('win'):
expanded_path = os.path.expanduser(text)
if os.path.isdir(expanded_path):
user = text
if add_trailing_sep_if_dir:
user += os.path.sep
users.append(user)
else:
import pwd
# Iterate through a list of users from the password database
for cur_pw in pwd.getpwall():
# Check if the user has an existing home dir
if os.path.isdir(cur_pw.pw_dir):
# Add a ~ to the user to match against text
cur_user = '~' + cur_pw.pw_name
if cur_user.startswith(text):
if add_trailing_sep_if_dir:
cur_user += os.path.sep
users.append(cur_user)
return users
# Determine if a trailing separator should be appended to directory completions
add_trailing_sep_if_dir = False
if endidx == len(line) or (endidx < len(line) and line[endidx] != os.path.sep):
add_trailing_sep_if_dir = True
# Used to replace cwd in the final results
cwd = os.getcwd()
cwd_added = False
# Used to replace expanded user path in final result
orig_tilde_path = ''
expanded_tilde_path = ''
# If the search text is blank, then search in the CWD for *
if not text:
search_str = os.path.join(os.getcwd(), '*')
cwd_added = True
else:
# Purposely don't match any path containing wildcards
wildcards = ['*', '?']
for wildcard in wildcards:
if wildcard in text:
return []
# Start the search string
search_str = text + '*'
# Handle tilde expansion and completion
if text.startswith('~'):
sep_index = text.find(os.path.sep, 1)
# If there is no slash, then the user is still completing the user after the tilde
if sep_index == -1:
return complete_users()
# Otherwise expand the user dir
else:
search_str = os.path.expanduser(search_str)
# Get what we need to restore the original tilde path later
orig_tilde_path = text[:sep_index]
expanded_tilde_path = os.path.expanduser(orig_tilde_path)
# If the search text does not have a directory, then use the cwd
elif not os.path.dirname(text):
search_str = os.path.join(os.getcwd(), search_str)
cwd_added = True
# Set this to True for proper quoting of paths with spaces
self.matches_delimited = True
# Find all matching path completions
matches = glob.glob(search_str)
# Filter out results that don't belong
if path_filter is not None:
matches = [c for c in matches if path_filter(c)]
# Don't append a space or closing quote to directory
if len(matches) == 1 and os.path.isdir(matches[0]):
self.allow_appended_space = False
self.allow_closing_quote = False
# Sort the matches before any trailing slashes are added
matches.sort(key=self.default_sort_key)
self.matches_sorted = True
# Build display_matches and add a slash to directories
for index, cur_match in enumerate(matches):
# Display only the basename of this path in the tab completion suggestions
self.display_matches.append(os.path.basename(cur_match))
# Add a separator after directories if the next character isn't already a separator
if os.path.isdir(cur_match) and add_trailing_sep_if_dir:
matches[index] += os.path.sep
self.display_matches[index] += os.path.sep
# Remove cwd if it was added to match the text readline expects
if cwd_added:
if cwd == os.path.sep:
to_replace = cwd
else:
to_replace = cwd + os.path.sep
matches = [cur_path.replace(to_replace, '', 1) for cur_path in matches]
# Restore the tilde string if we expanded one to match the text readline expects
if expanded_tilde_path:
matches = [cur_path.replace(expanded_tilde_path, orig_tilde_path, 1) for cur_path in matches]
return matches
def shell_cmd_complete(self, text: str, line: str, begidx: int, endidx: int, *,
complete_blank: bool = False) -> List[str]:
"""Performs completion of executables either in a user's path or a given path
:param text: the string prefix we are attempting to match (all matches must begin with it)
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
:param complete_blank: If True, then a blank will complete all shell commands in a user's path. If False, then
no completion is performed. Defaults to False to match Bash shell behavior.
:return: a list of possible tab completions
"""
# Don't tab complete anything if no shell command has been started
if not complete_blank and not text:
return []
# If there are no path characters in the search text, then do shell command completion in the user's path
if not text.startswith('~') and os.path.sep not in text:
return utils.get_exes_in_path(text)
# Otherwise look for executables in the given path
else:
return self.path_complete(text, line, begidx, endidx,
path_filter=lambda path: os.path.isdir(path) or os.access(path, os.X_OK))