Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Lib/_pyrepl/base_eventqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
from .trace import trace

class BaseEventQueue:
_ESCAPE_TIMEOUT_MS = 50

def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
self.compiled_keymap = keymap.compile_keymap(keymap_dict)
self.keymap = self.compiled_keymap
trace("keymap {k!r}", k=self.keymap)
self.encoding = encoding
self.events: deque[Event] = deque()
self.buf = bytearray()
self._pending_escape_deadline: float | None = None

def get(self) -> Event | None:
"""
Expand Down Expand Up @@ -69,6 +72,50 @@ def insert(self, event: Event) -> None:
trace('added event {event}', event=event)
self.events.append(event)

def has_pending_escape_sequence(self) -> bool:
"""
Check if there's a potential escape sequence waiting for more input.

Returns True if we have exactly one byte (ESC) in the buffer and
we're in the middle of keymap navigation, indicating we're waiting
to see if more bytes will arrive to complete an escape sequence.
"""
return (
len(self.buf) == 1
and self.buf[0] == 27 # ESC byte
and self.keymap is not self.compiled_keymap
)

def should_emit_standalone_escape(self, current_time_ms: float) -> bool:
"""
Check if a pending ESC should be emitted as a standalone escape key.
"""
if not self.has_pending_escape_sequence():
return False

if self._pending_escape_deadline is None:
# First time checking - set the deadline
self._pending_escape_deadline = current_time_ms + self._ESCAPE_TIMEOUT_MS
return False

# Check if the deadline has passed
return current_time_ms >= self._pending_escape_deadline

def emit_standalone_escape(self) -> None:
"""
Emit the buffered ESC byte as a standalone escape key event.
"""
self.keymap = self.compiled_keymap
# Standalone ESC event
self.insert(Event('key', '\033', b'\033'))

# Just in case there are remaining bytes in the buffer
remaining = self.flush_buf()[1:]
for byte in remaining:
self.push(byte)

self._pending_escape_deadline = None

def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
Expand All @@ -78,6 +125,9 @@ def push(self, char: int | bytes) -> None:
char = ord_char.to_bytes()
self.buf.append(ord_char)

if self._pending_escape_deadline is not None:
self._pending_escape_deadline = None

if char in self.keymap:
if self.keymap is self.compiled_keymap:
# sanity check, buffer is empty when a special key comes
Expand Down
95 changes: 91 additions & 4 deletions Lib/_pyrepl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,20 +325,41 @@ def do(self) -> None:
b = r.buffer
for _ in range(r.get_arg()):
p = r.pos + 1
if p <= len(b):
r.pos = p
# In vi normal mode, don't move past the last character
if r.editor_mode.is_normal():
eol_pos = r.eol()
max_pos = max(r.bol(), eol_pos - 1) if eol_pos > r.bol() else r.bol()
if p <= max_pos:
r.pos = p
else:
self.reader.error("end of line")
else:
self.reader.error("end of buffer")
if p <= len(b):
r.pos = p
else:
self.reader.error("end of buffer")


class beginning_of_line(MotionCommand):
def do(self) -> None:
self.reader.pos = self.reader.bol()


class first_non_whitespace_character(MotionCommand):
def do(self) -> None:
self.reader.pos = self.reader.first_non_whitespace()


class end_of_line(MotionCommand):
def do(self) -> None:
self.reader.pos = self.reader.eol()
r = self.reader
eol_pos = r.eol()
if r.editor_mode.is_normal():
bol_pos = r.bol()
# Don't go past the last character (but stay at bol if line is empty)
r.pos = max(bol_pos, eol_pos - 1) if eol_pos > bol_pos else bol_pos
else:
r.pos = eol_pos


class home(MotionCommand):
Expand All @@ -365,6 +386,20 @@ def do(self) -> None:
r.pos = r.bow()


class end_of_word(MotionCommand):
def do(self) -> None:
r = self.reader
for _ in range(r.get_arg()):
r.pos = r.vi_eow()


class vi_forward_word(MotionCommand):
def do(self) -> None:
r = self.reader
for _ in range(r.get_arg()):
r.pos = r.vi_forward_word()


class self_insert(EditCommand):
def do(self) -> None:
r = self.reader
Expand Down Expand Up @@ -503,3 +538,55 @@ def do(self) -> None:
)
self.reader.insert(data.replace(done, ""))
self.reader.last_refresh_cache.invalidated = True


class vi_normal_mode(Command):
def do(self) -> None:
self.reader.enter_normal_mode()


class vi_insert_mode(Command):
def do(self) -> None:
self.reader.enter_insert_mode()


class vi_append_mode(Command):
def do(self) -> None:
if self.reader.pos < len(self.reader.buffer):
self.reader.pos += 1
self.reader.enter_insert_mode()


class vi_append_eol(Command):
def do(self) -> None:
while self.reader.pos < len(self.reader.buffer):
if self.reader.buffer[self.reader.pos] == '\n':
break
self.reader.pos += 1
self.reader.enter_insert_mode()


class vi_insert_bol(Command):
def do(self) -> None:
self.reader.pos = self.reader.first_non_whitespace()
self.reader.enter_insert_mode()


class vi_open_below(Command):
def do(self) -> None:
while self.reader.pos < len(self.reader.buffer):
if self.reader.buffer[self.reader.pos] == '\n':
break
self.reader.pos += 1

self.reader.insert('\n')
self.reader.enter_insert_mode()

class vi_open_above(Command):
def do(self) -> None:
while self.reader.pos > 0 and self.reader.buffer[self.reader.pos - 1] != '\n':
self.reader.pos -= 1

self.reader.insert('\n')
self.reader.pos -= 1
self.reader.enter_insert_mode()
20 changes: 14 additions & 6 deletions Lib/_pyrepl/historical_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,27 @@ def __post_init__(self) -> None:
)

def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
return super().collect_keymap() + (
bindings: list[tuple[KeySpec, CommandName]] = [
(r"\C-n", "next-history"),
(r"\C-p", "previous-history"),
(r"\C-o", "operate-and-get-next"),
(r"\C-r", "reverse-history-isearch"),
(r"\C-s", "forward-history-isearch"),
(r"\M-r", "restore-history"),
(r"\M-.", "yank-arg"),
(r"\<page down>", "history-search-forward"),
(r"\x1b[6~", "history-search-forward"),
(r"\<page up>", "history-search-backward"),
(r"\x1b[5~", "history-search-backward"),
)
]

if not self.use_vi_mode:
bindings.extend(
[
(r"\M-r", "restore-history"),
(r"\M-.", "yank-arg"),
(r"\x1b[6~", "history-search-forward"),
(r"\x1b[5~", "history-search-backward"),
]
)

return super().collect_keymap() + tuple(bindings)

def select_item(self, i: int) -> None:
self.transient_history[self.historyi] = self.get_unicode()
Expand Down
Loading
Loading