/
completion.py
executable file
·1239 lines (1020 loc) · 39.3 KB
/
completion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python2
# Copyright 2016 Andy Chu. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
"""
completion.py - Tab completion.
Architecture:
Completion should run in threads? For two reasons:
- Completion can be slow -- e.g. completion for distributed resources
- Because readline has a weird interface, and then you can implement
"iterators" in C++ or oil. They just push onto a PIPE. Use a netstring
protocol and self-pipe?
- completion can be in another process anyway?
Does that mean the user code gets run in an entirely separate interpreter? The
whole lexer/parser/cmd_eval combo has to be thread-safe. Does it get a copy of
the same startup state?
Features TODO:
- complete flags after alias expansion
- complete history expansions like zsh
- complete flags for all builtins, using frontend/args.py?
- might need a special error token
bash note: most of this stuff is in pcomplete.c and bashline.c (4K lines!).
Uses ITEMLIST with a bunch of flags.
"""
from __future__ import print_function
import pwd
import time
from _devbuild.gen.id_kind_asdl import Id
from _devbuild.gen.syntax_asdl import word_part_e, redir_param_e
from _devbuild.gen.runtime_asdl import value_e, value__Str
from _devbuild.gen.types_asdl import redir_arg_type_e
from core import error
from core.pyutil import stderr_line
from core import ui
from core import util
from core.util import log
from frontend import consts
from frontend import reader
from pylib import os_path
from pylib import path_stat
from osh import word_
from core import state
from osh.string_ops import ShellQuoteB
import libc
import posix_ as posix
from typing import (
Dict, Tuple, List, Optional, Iterator, Union, Callable, Any, TYPE_CHECKING
)
if TYPE_CHECKING:
from _devbuild.gen.syntax_asdl import Token, compound_word, command__ShFunction
from core.alloc import Arena
from core.comp_ui import State
from core.state import Mem
from core.util import DebugFile
from frontend.parse_lib import ParseContext
from osh.builtin_comp import _FixedWordsAction
from osh.cmd_eval import CommandEvaluator
from osh.split import SplitContext
from osh.word_eval import AbstractWordEvaluator
# To quote completion candidates.
# ! is for history expansion, which only happens interactively, but
# completion only does too.
# *?[] are for globs
# {} are for brace expansion
# ~ in filenames should be quoted
#
# TODO: Also escape tabs as \t and newlines at \n?
SHELL_META_CHARS = r' ~`!$&|;()\"*?[]{}<>' + "'"
class _RetryCompletion(Exception):
"""For the 'exit 124' protocol."""
pass
CH_Break, CH_Other = xrange(2) # Character types
ST_Begin, ST_Break, ST_Other = xrange(3) # States
# State machine definition.
_TRANSITIONS = {
# (state, char) -> (new state, emit span)
(ST_Begin, CH_Break): (ST_Break, False),
(ST_Begin, CH_Other): (ST_Other, False),
(ST_Break, CH_Break): (ST_Break, False),
(ST_Break, CH_Other): (ST_Other, True),
(ST_Other, CH_Break): (ST_Break, True),
(ST_Other, CH_Other): (ST_Other, False),
}
def AdjustArg(arg, break_chars, argv_out):
# type: (str, List[str], List[str]) -> None
end_indices = [] # stores the end of each span
state = ST_Begin
for i, c in enumerate(arg):
ch = CH_Break if c in break_chars else CH_Other
state, emit_span = _TRANSITIONS[state, ch]
if emit_span:
end_indices.append(i)
# Always emit a span at the end (even for empty string)
end_indices.append(len(arg))
begin = 0
for end in end_indices:
argv_out.append(arg[begin:end])
begin = end
class NullCompleter(object):
def Matches(self, comp):
# type: (Api) -> List
return []
# NOTE: How to create temporary options? With copy.deepcopy()?
# We might want that as a test for OVM. Copying is similar to garbage
# collection in that you walk a graph.
# These values should never be mutated.
_DEFAULT_OPTS = {}
_DO_NOTHING = (_DEFAULT_OPTS, NullCompleter())
class OptionState(object):
"""Stores the compopt state of the CURRENT completion."""
def __init__(self):
# type: () -> None
# For the IN-PROGRESS completion.
self.currently_completing = False
# should be SET to a COPY of the registration options by the completer.
self.dynamic_opts = None
class Lookup(object):
"""Stores completion hooks registered by the user."""
def __init__(self):
# type: () -> None
# command name -> UserSpec
# Pseudo-commands __first and __fallback are for -E and -D.
self.lookup = {
'__fallback': _DO_NOTHING,
'__first': _DO_NOTHING,
}
self.commands_with_spec_changes = [] # for the 124 protocol
# So you can register *.sh, unlike bash. List of (glob, [actions]),
# searched linearly.
self.patterns = []
def __str__(self):
return '<completion.Lookup %s>' % self.lookup
def PrintSpecs(self):
# type: () -> None
"""For 'complete' without args."""
for name in sorted(self.lookup):
base_opts, user_spec = self.lookup[name]
print('%-15s %s %s' % (name, base_opts, user_spec))
print('---')
for pat, spec in self.patterns:
print('%s = %s' % (pat, spec))
def ClearCommandsChanged(self):
# type: () -> None
del self.commands_with_spec_changes[:]
def GetCommandsChanged(self):
# type: () -> None
return self.commands_with_spec_changes
def RegisterName(self, name, base_opts, user_spec):
# type: (str, Dict, UserSpec) -> None
"""Register a completion action with a name.
Used by the 'complete' builtin.
"""
self.lookup[name] = (base_opts, user_spec)
if name not in ('__fallback', '__first'):
self.commands_with_spec_changes.append(name)
def RegisterGlob(self, glob_pat, base_opts, user_spec):
# type: (str, Any, UserSpec) -> None
self.patterns.append((glob_pat, base_opts, user_spec))
def GetSpecForName(self, argv0):
# type: (str) -> Tuple[None, None]
"""
Args:
argv0: A finished argv0 to lookup
"""
pair = self.lookup.get(argv0) # NOTE: Could be ''
if pair:
return pair
key = os_path.basename(argv0)
pair = self.lookup.get(key)
if pair:
return pair
for glob_pat, base_opts, user_spec in self.patterns:
#log('Matching %r %r', key, glob_pat)
if libc.fnmatch(glob_pat, key, True): # support extglob
return base_opts, user_spec
return None, None
def GetFirstSpec(self):
# type: () -> Tuple[Dict, UserSpec]
return self.lookup['__first']
def GetFallback(self):
# type: () -> Tuple[Dict, UserSpec]
return self.lookup['__fallback']
class Api(object):
def __init__(self, line='', begin=0, end=0):
# type: (str, int, int) -> None
"""
Args:
index: if -1, then we're running through compgen
"""
self.line = line
self.begin = begin
self.end = end
# NOTE: COMP_WORDBREAKS is initialized in Mem().
# NOTE: to_complete could be 'cur'
def Update(self, first='', to_complete='', prev='', index=0,
partial_argv=None):
# type: (str, str, str, int, List[str]) -> None
"""Added after we've done parsing."""
self.first = first
self.to_complete = to_complete
self.prev = prev
self.index = index # COMP_CWORD
# COMP_ARGV and COMP_WORDS can be derived from this
self.partial_argv = partial_argv or []
def __repr__(self):
# type: () -> str
"""For testing"""
return '<Api %r %d-%d>' % (self.line, self.begin, self.end)
#
# Actions
#
class CompletionAction(object):
"""Returns a list of words.
Function
Literal words
"""
def __init__(self):
# type: () -> None
pass
def Matches(self, comp):
pass
def __repr__(self):
# type: () -> str
return self.__class__.__name__
class UsersAction(CompletionAction):
"""complete -A user"""
def Matches(self, comp):
for u in pwd.getpwall():
name = u.pw_name
if name.startswith(comp.to_complete):
yield name
class TestAction(CompletionAction):
def __init__(self, words, delay=None):
# type: (List[str], Optional[float]) -> None
self.words = words
self.delay = delay
def Matches(self, comp):
# type: (Api) -> Iterator[Union[Iterator, Iterator[str]]]
for w in self.words:
if w.startswith(comp.to_complete):
if self.delay:
time.sleep(self.delay)
yield w
class DynamicWordsAction(CompletionAction):
""" compgen -W '$(echo one two three)' """
def __init__(self,
word_ev, # type: AbstractWordEvaluator
splitter, # type: SplitContext
arg_word, # type: compound_word
arena, # type: Arena
):
# type: (...) -> None
self.word_ev = word_ev
self.splitter = splitter
self.arg_word = arg_word
self.arena = arena
def Matches(self, comp):
# type: (Api) -> Iterator[Union[Iterator, Iterator[str]]]
try:
val = self.word_ev.EvalWordToString(self.arg_word)
except error.FatalRuntime as e:
ui.PrettyPrintError(e, self.arena)
raise
# SplitForWordEval() Allows \ escapes
candidates = self.splitter.SplitForWordEval(val.s)
for c in candidates:
if c.startswith(comp.to_complete):
yield c
class FileSystemAction(CompletionAction):
"""Complete paths from the file system.
Directories will have a / suffix.
"""
def __init__(self, dirs_only=False, exec_only=False, add_slash=False):
# type: (bool, bool, bool) -> None
self.dirs_only = dirs_only
self.exec_only = exec_only
# This is for redirects, not for UserSpec, which should respect compopt -o
# filenames.
self.add_slash = add_slash # for directories
def Matches(self, comp):
# type: (Api) -> Iterator[Union[Iterator, Iterator[str]]]
to_complete = comp.to_complete
# Problem: .. and ../.. don't complete /.
# TODO: Set display_pos before fixing this.
#import os
#to_complete = os.path.normpath(to_complete)
dirname, basename = os_path.split(to_complete)
if dirname == '': # We're completing in this directory
to_list = '.'
else: # We're completing in some other directory
to_list = dirname
if 0:
log('basename %r', basename)
log('to_list %r', to_list)
log('dirname %r', dirname)
try:
names = posix.listdir(to_list)
except OSError as e:
return # nothing
for name in names:
path = os_path.join(dirname, name)
if path.startswith(to_complete):
if self.dirs_only: # add_slash not used here
# NOTE: There is a duplicate isdir() check later to add a trailing
# slash. Consolidate the checks for fewer stat() ops. This is hard
# because all the completion actions must obey the same interface.
# We could have another type like candidate = File | Dir |
# OtherString ?
if path_stat.isdir(path):
yield path
continue
if self.exec_only:
# TODO: Handle exception if file gets deleted in between listing and
# check?
if not posix.access(path, posix.X_OK_):
continue
if self.add_slash and path_stat.isdir(path):
yield path + '/'
else:
yield path
class ShellFuncAction(CompletionAction):
"""Call a user-defined function using bash's completion protocol."""
def __init__(self, cmd_ev, func, comp_lookup):
# type: (CommandEvaluator, command__ShFunction, Lookup) -> None
"""
Args:
comp_lookup: For the 124 protocol: test if the user-defined function
registered a new UserSpec.
"""
self.cmd_ev = cmd_ev
self.func = func
self.comp_lookup = comp_lookup
def __repr__(self):
# TODO: Add file and line number here!
return '<ShellFuncAction %s>' % (self.func.name,)
def log(self, *args):
# type: (*Any) -> None
self.cmd_ev.debug_f.log(*args)
def Matches(self, comp):
# type: (Api) -> List[str]
# Have to clear the response every time. TODO: Reuse the object?
state.SetGlobalArray(self.cmd_ev.mem, 'COMPREPLY', [])
# New completions should use COMP_ARGV, a construct specific to OSH>
state.SetGlobalArray(self.cmd_ev.mem, 'COMP_ARGV', comp.partial_argv)
# Old completions may use COMP_WORDS. It is split by : and = to emulate
# bash's behavior.
# More commonly, they will call _init_completion and use the 'words' output
# of that, ignoring COMP_WORDS.
comp_words = []
for a in comp.partial_argv:
AdjustArg(a, [':', '='], comp_words)
if comp.index == -1: # cmopgen
comp_cword = comp.index
else:
comp_cword = len(comp_words) - 1 # weird invariant
state.SetGlobalArray(self.cmd_ev.mem, 'COMP_WORDS', comp_words)
state.SetGlobalString(self.cmd_ev.mem, 'COMP_CWORD', str(comp_cword))
state.SetGlobalString(self.cmd_ev.mem, 'COMP_LINE', comp.line)
state.SetGlobalString(self.cmd_ev.mem, 'COMP_POINT', str(comp.end))
argv = [comp.first, comp.to_complete, comp.prev]
self.log('Running completion function %r with arguments %s',
self.func.name, argv)
self.comp_lookup.ClearCommandsChanged()
status = self.cmd_ev.RunFuncForCompletion(self.func, argv)
commands_changed = self.comp_lookup.GetCommandsChanged()
self.log('comp.first %s, commands_changed: %s', comp.first,
commands_changed)
if status == 124:
cmd = os_path.basename(comp.first)
if cmd in commands_changed:
self.log('Got status 124 from %r and %s commands changed',
self.func.name, commands_changed)
raise _RetryCompletion()
else:
# This happens with my own completion scripts. bash doesn't show an
# error.
self.log(
"Function %r returned 124, but the completion spec for %r wasn't "
"changed", self.func.name, cmd)
return []
# Read the response.
# NOTE: 'COMP_REPLY' would follow the naming convention!
val = state.GetGlobal(self.cmd_ev.mem, 'COMPREPLY')
if val.tag_() == value_e.Undef:
# We set it above, so this error would only happen if the user unset it.
# Not changing it means there were no completions.
# TODO: This writes over the command line; it would be better to use an
# error object.
stderr_line('osh: Ran function %r but COMPREPLY was unset',
self.func.name)
return []
if val.tag_() != value_e.MaybeStrArray:
log('ERROR: COMPREPLY should be an array, got %s', val)
return []
self.log('COMPREPLY %s', val)
# Return this all at once so we don't have a generator. COMPREPLY happens
# all at once anyway.
return val.strs
class VariablesAction(CompletionAction):
"""compgen -A variable."""
def __init__(self, mem):
# type: (Mem) -> None
self.mem = mem
def Matches(self, comp):
for var_name in self.mem.VarNames():
yield var_name
class ExternalCommandAction(CompletionAction):
"""Complete commands in $PATH.
This is PART of compge -A command.
"""
def __init__(self, mem):
# type: (Mem) -> None
"""
Args:
mem: for looking up Path
"""
self.mem = mem
# Should we list everything executable in $PATH here? And then whenever
# $PATH is changed, regenerated it?
# Or we can cache directory listings? What if the contents of the dir
# changed?
# Can we look at the dir timestamp?
#
# (dir, timestamp) -> list of entries perhaps? And then every time you hit
# tab, do you have to check the timestamp? It should be cached by the
# kernel, so yes.
self.ext = []
# (dir, timestamp) -> list
# NOTE: This cache assumes that listing a directory is slower than statting
# it to get the mtime. That may not be true on all systems? Either way
# you are reading blocks of metadata. But I guess /bin on many systems is
# huge, and will require lots of sys calls.
self.cache = {}
def Matches(self, comp):
# type: (Api) -> Iterator[Union[Iterator, Iterator[str]]]
"""
TODO: Cache is never cleared.
- When we get a newer timestamp, we should clear the old one.
- When PATH is changed, we can remove old entries.
"""
val = self.mem.GetVar('PATH')
if val.tag_() != value_e.Str:
# No matches if not a string
return
assert isinstance(val, value__Str) # for MyPy
path_dirs = val.s.split(':')
#log('path: %s', path_dirs)
executables = [] # type: List[str]
for d in path_dirs:
try:
st = posix.stat(d)
except OSError as e:
# There could be a directory that doesn't exist in the $PATH.
continue
key = (d, st.st_mtime)
dir_exes = self.cache.get(key)
if dir_exes is None:
entries = posix.listdir(d)
dir_exes = []
for name in entries:
path = os_path.join(d, name)
# TODO: Handle exception if file gets deleted in between listing and
# check?
if not posix.access(path, posix.X_OK_):
continue
dir_exes.append(name) # append the name, not the path
self.cache[key] = dir_exes
executables.extend(dir_exes)
# TODO: Shouldn't do the prefix / space thing ourselves. readline does
# that at the END of the line.
for word in executables:
if word.startswith(comp.to_complete):
yield word
class GlobPredicate(object):
"""Expand into files that match a pattern. !*.py filters them.
Weird syntax:
-X *.py or -X !*.py
Also & is a placeholder for the string being completed?. Yeah I probably
want to get rid of this feature.
"""
def __init__(self, include, glob_pat):
# type: (bool, str) -> None
self.include = include # True for inclusion, False for exclusion
self.glob_pat = glob_pat # extended glob syntax supported
def __call__(self, candidate):
# type: (str) -> int
"""Should we INCLUDE the candidate or not?"""
matched = libc.fnmatch(self.glob_pat, candidate, True)
# This is confusing because of bash's double-negative syntax
if self.include:
return not matched
else:
return matched
def __repr__(self):
return '<GlobPredicate %s %r>' % (self.include, self.glob_pat)
def DefaultPredicate(candidate):
# type: (str) -> bool
return True
class UserSpec(object):
"""The user configuration for completion.
- The compgen builtin exposes this DIRECTLY.
- Readline must call ReadlineCallback, which uses RootCompleter.
"""
def __init__(self,
actions, # type: Union[List[FileSystemAction], List[TestAction], List[_FixedWordsAction]]
extra_actions, # type: List
else_actions, # type: List
predicate, # type: Callable
prefix='', # type: str
suffix='', # type: str
):
# type: (...) -> None
self.actions = actions
self.extra_actions = extra_actions
self.else_actions = else_actions
self.predicate = predicate # for -X
self.prefix = prefix
self.suffix = suffix
def Matches(self, comp):
# type: (Api) -> Iterator[Union[Iterator, Iterator[Tuple[str, bool]]]]
"""Yield completion candidates."""
num_matches = 0
for a in self.actions:
is_fs_action = isinstance(a, FileSystemAction)
for match in a.Matches(comp):
# Special case hack to match bash for compgen -F. It doesn't filter by
# to_complete!
show = (
self.predicate(match) and
# ShellFuncAction results are NOT filtered by prefix!
(match.startswith(comp.to_complete) or
isinstance(a, ShellFuncAction))
)
# There are two kinds of filters: changing the string, and filtering
# the set of strings. So maybe have modifiers AND filters? A triple.
if show:
yield self.prefix + match + self.suffix, is_fs_action
num_matches += 1
# NOTE: extra_actions and else_actions don't respect -X, -P or -S, and we
# don't have to filter by startswith(comp.to_complete). They are all all
# FileSystemActions, which do it already.
# for -o plusdirs
for a in self.extra_actions:
for match in a.Matches(comp):
yield match, True # We know plusdirs is a file system action
# for -o default and -o dirnames
if num_matches == 0:
for a in self.else_actions:
for match in a.Matches(comp):
yield match, True # both are FileSystemAction
# What if the cursor is not at the end of line? See readline interface.
# That's OK -- we just truncate the line at the cursor?
# Hm actually zsh does something smarter, and which is probably preferable.
# It completes the word that
def __str__(self):
# type: () -> str
parts = ['(UserSpec']
if self.actions:
parts.append(str(self.actions))
if self.extra_actions:
parts.append('extra=%s' % self.extra_actions)
if self.else_actions:
parts.append('else=%s' % self.else_actions)
if self.predicate is not DefaultPredicate:
parts.append('pred = %s' % self.predicate)
if self.prefix:
parts.append('prefix=%r' % self.prefix)
if self.suffix:
parts.append('suffix=%r' % self.suffix)
return ' '.join(parts) + ')'
# Helpers for Matches()
# NOTE: We could add Lit_Dollar, but it would affect many lexer modes.
def IsDollar(t):
# type: (Token) -> bool
return t.id == Id.Lit_Other and t.val == '$'
def IsDummy(t):
# type: (Token) -> bool
return t.id == Id.Lit_CompDummy
def WordEndsWithCompDummy(w):
# type: (compound_word) -> bool
last_part = w.parts[-1]
return (
last_part.tag_() == word_part_e.Literal and
last_part.id == Id.Lit_CompDummy
)
class RootCompleter(object):
"""Dispatch to various completers.
- Complete the OSH language (variables, etc.), or
- Statically evaluate argv and dispatch to a command completer.
"""
def __init__(self,
word_ev, # type: AbstractWordEvaluator
mem, # type: Mem
comp_lookup, # type: Lookup
compopt_state, # type: OptionState
comp_ui_state, # type: State
parse_ctx, # type: ParseContext
debug_f, # type: DebugFile
):
# type: (...) -> None
self.word_ev = word_ev # for static evaluation of words
self.mem = mem # to complete variable names
self.comp_lookup = comp_lookup
self.compopt_state = compopt_state # for compopt builtin
self.comp_ui_state = comp_ui_state
self.parse_ctx = parse_ctx
self.debug_f = debug_f
def Matches(self, comp):
# type: (Api) -> Iterator[Union[Iterator, Iterator[str]]]
"""
Args:
comp: Callback args from readline. Readline uses set_completer_delims to
tokenize the string.
Returns a list of matches relative to readline's completion_delims.
We have to post-process the output of various completers.
"""
arena = self.parse_ctx.arena # Used by inner functions
# Pass the original line "out of band" to the completion callback.
line_until_tab = comp.line[:comp.end]
self.comp_ui_state.line_until_tab = line_until_tab
self.parse_ctx.trail.Clear()
line_reader = reader.StringLineReader(line_until_tab, self.parse_ctx.arena)
c_parser = self.parse_ctx.MakeOshParser(line_reader, emit_comp_dummy=True)
# We want the output from parse_ctx, so we don't use the return value.
try:
c_parser.ParseLogicalLine()
except error.Parse as e:
# e.g. 'ls | ' will not parse. Now inspect the parser state!
pass
debug_f = self.debug_f
trail = self.parse_ctx.trail
if 1:
trail.PrintDebugString(debug_f)
#
# First try completing the shell language itself.
#
# NOTE: We get Eof_Real in the command state, but not in the middle of a
# BracedVarSub. This is due to the difference between the CommandParser
# and WordParser.
tokens = trail.tokens
last = -1
if tokens[-1].id == Id.Eof_Real:
last -= 1 # ignore it
try:
t1 = tokens[last]
except IndexError:
t1 = None
try:
t2 = tokens[last-1]
except IndexError:
t2 = None
debug_f.log('line: %r', comp.line)
debug_f.log('rl_slice from byte %d to %d: %r', comp.begin, comp.end,
comp.line[comp.begin:comp.end])
debug_f.log('t1 %s', t1)
debug_f.log('t2 %s', t2)
# Each of the 'yield' statements below returns a fully-completed line, to
# appease the readline library. The root cause of this dance: If there's
# one candidate, readline is responsible for redrawing the input line. OSH
# only displays candidates and never redraws the input line.
def _TokenStart(tok):
# type: (Token) -> int
span = arena.GetLineSpan(tok.span_id)
return span.col
if t2: # We always have t1?
# echo $
if IsDollar(t2) and IsDummy(t1):
self.comp_ui_state.display_pos = _TokenStart(t2) + 1 # 1 for $
for name in self.mem.VarNames():
yield line_until_tab + name # no need to quote var names
return
# echo ${
if t2.id == Id.Left_DollarBrace and IsDummy(t1):
self.comp_ui_state.display_pos = _TokenStart(t2) + 2 # 2 for ${
for name in self.mem.VarNames():
yield line_until_tab + name # no need to quote var names
return
# echo $P
if t2.id == Id.VSub_DollarName and IsDummy(t1):
# Example: ${undef:-$P
# readline splits at ':' so we have to prepend '-$' to every completed
# variable name.
self.comp_ui_state.display_pos = _TokenStart(t2) + 1 # 1 for $
to_complete = t2.val[1:]
n = len(to_complete)
for name in self.mem.VarNames():
if name.startswith(to_complete):
yield line_until_tab + name[n:] # no need to quote var names
return
# echo ${P
if t2.id == Id.VSub_Name and IsDummy(t1):
self.comp_ui_state.display_pos = _TokenStart(t2) # no offset
to_complete = t2.val
n = len(to_complete)
for name in self.mem.VarNames():
if name.startswith(to_complete):
yield line_until_tab + name[n:] # no need to quote var names
return
# echo $(( VAR
if t2.id == Id.Lit_ArithVarLike and IsDummy(t1):
self.comp_ui_state.display_pos = _TokenStart(t2) # no offset
to_complete = t2.val
n = len(to_complete)
for name in self.mem.VarNames():
if name.startswith(to_complete):
yield line_until_tab + name[n:] # no need to quote var names
return
if trail.words:
# echo ~<TAB>
# echo ~a<TAB> $(home dirs)
# This must be done at a word level, and TildeDetectAll() does NOT help
# here, because they don't have trailing slashes yet! We can't do it on
# tokens, because otherwise f~a will complete. Looking at word_part is
# EXACTLY what we want.
parts = trail.words[-1].parts
if (len(parts) == 2 and
parts[0].tag_() == word_part_e.Literal and
parts[1].tag_() == word_part_e.Literal and
parts[0].id == Id.Lit_TildeLike and
parts[1].id == Id.Lit_CompDummy):
t2 = parts[0]
# +1 for ~
self.comp_ui_state.display_pos = _TokenStart(parts[0]) + 1
to_complete = t2.val[1:]
n = len(to_complete)
for u in pwd.getpwall(): # catch errors?
name = u.pw_name
if name.startswith(to_complete):
yield line_until_tab + ShellQuoteB(name[n:]) + '/'
return
# echo hi > f<TAB> (complete redirect arg)
if trail.redirects:
r = trail.redirects[-1]
# Only complete 'echo >', but not 'echo >&' or 'cat <<'
# TODO: Don't complete <<< 'h'
if (r.arg.tag_() == redir_param_e.Word and
consts.RedirArgType(r.op.id) == redir_arg_type_e.Path):
arg_word = r.arg
if WordEndsWithCompDummy(arg_word):
debug_f.log('Completing redirect arg')
try:
val = self.word_ev.EvalWordToString(r.arg)
except error.FatalRuntime as e:
debug_f.log('Error evaluating redirect word: %s', e)
return
if val.tag_() != value_e.Str:
debug_f.log("Didn't get a string from redir arg")
return
span_id = word_.LeftMostSpanForWord(arg_word)
span = arena.GetLineSpan(span_id)
self.comp_ui_state.display_pos = span.col
comp.Update(to_complete=val.s) # FileSystemAction uses only this
n = len(val.s)
action = FileSystemAction(add_slash=True)
for name in action.Matches(comp):
yield line_until_tab + ShellQuoteB(name[n:])
return
#
# We're not completing the shell language. Delegate to user-defined
# completion for external tools.
#
# Set below, and set on retries.
base_opts = None
user_spec = None
# Used on retries.
partial_argv = []
num_partial = -1
first = None
if trail.words:
# Now check if we're completing a word!
if WordEndsWithCompDummy(trail.words[-1]):
debug_f.log('Completing words')
#
# It didn't look like we need to complete var names, tilde, redirects,
# etc. Now try partial_argv, which may involve invoking PLUGINS.
# needed to complete paths with ~
words2 = word_.TildeDetectAll(trail.words)
if 0:
debug_f.log('After tilde detection')
for w in words2:
print(w, file=debug_f)
if 0:
debug_f.log('words2:')
for w2 in words2:
debug_f.log(' %s', w2)
for w in words2:
try:
# TODO:
# - Should we call EvalWordSequence? But turn globbing off? It
# can do splitting and such.
# - We could have a variant to eval TildeSub to ~ ?
val = self.word_ev.EvalWordToString(w)
except error.FatalRuntime:
# Why would it fail?
continue
if val.tag_() == value_e.Str:
partial_argv.append(val.s)
else:
pass
debug_f.log('partial_argv: %s', partial_argv)
num_partial = len(partial_argv)
first = partial_argv[0]
alias_first = None
debug_f.log('alias_words: %s', trail.alias_words)
if trail.alias_words:
w = trail.alias_words[0]
try:
val = self.word_ev.EvalWordToString(w)
except error.FatalRuntime:
pass
alias_first = val.s
debug_f.log('alias_first: %s', alias_first)