Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions Lib/_pyrepl/fancy_termios.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,25 @@
import termios


TYPE_CHECKING = False

if TYPE_CHECKING:
from typing import cast
else:
cast = lambda typ, val: val


class TermState:
def __init__(self, tuples):
(
self.iflag,
self.oflag,
self.cflag,
self.lflag,
self.ispeed,
self.ospeed,
self.cc,
) = tuples
def __init__(self, attrs: list[int | list[bytes]]) -> None:
self.iflag = cast(int, attrs[0])
self.oflag = cast(int, attrs[1])
self.cflag = cast(int, attrs[2])
self.lflag = cast(int, attrs[3])
self.ispeed = cast(int, attrs[4])
self.ospeed = cast(int, attrs[5])
self.cc = cast(list[bytes], attrs[6])

def as_list(self):
def as_list(self) -> list[int | list[bytes]]:
return [
self.iflag,
self.oflag,
Expand All @@ -45,32 +51,32 @@ def as_list(self):
self.cc[:],
]

def copy(self):
def copy(self) -> "TermState":
return self.__class__(self.as_list())


def tcgetattr(fd):
def tcgetattr(fd: int) -> TermState:
return TermState(termios.tcgetattr(fd))


def tcsetattr(fd, when, attrs):
def tcsetattr(fd: int, when: int, attrs: TermState) -> None:
termios.tcsetattr(fd, when, attrs.as_list())


class Term(TermState):
TS__init__ = TermState.__init__

def __init__(self, fd=0):
def __init__(self, fd: int = 0) -> None:
self.TS__init__(termios.tcgetattr(fd))
self.fd = fd
self.stack = []
self.stack: list[list[int | list[bytes]]] = []

def save(self):
def save(self) -> None:
self.stack.append(self.as_list())

def set(self, when=termios.TCSANOW):
def set(self, when: int = termios.TCSANOW) -> None:
termios.tcsetattr(self.fd, when, self.as_list())

def restore(self):
def restore(self) -> None:
self.TS__init__(self.stack.pop())
self.set()
59 changes: 37 additions & 22 deletions Lib/_pyrepl/unix_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from . import terminfo
from .console import Console, Event
from .fancy_termios import tcgetattr, tcsetattr
from .fancy_termios import tcgetattr, tcsetattr, TermState
from .trace import trace
from .unix_eventqueue import EventQueue
from .utils import wlen
Expand All @@ -51,16 +51,19 @@

# types
if TYPE_CHECKING:
from typing import IO, Literal, overload
from typing import AbstractSet, IO, Literal, overload, cast
else:
overload = lambda func: None
cast = lambda typ, val: val


class InvalidTerminal(RuntimeError):
pass
def __init__(self, message: str) -> None:
super().__init__(errno.EIO, message)


_error = (termios.error, InvalidTerminal)
_error_codes_to_ignore = frozenset([errno.EIO, errno.ENXIO, errno.EPERM])

SIGWINCH_EVENT = "repaint"

Expand Down Expand Up @@ -125,12 +128,13 @@ def __init__(self):

def register(self, fd, flag):
self.fd = fd

# note: The 'timeout' argument is received as *milliseconds*
def poll(self, timeout: float | None = None) -> list[int]:
if timeout is None:
r, w, e = select.select([self.fd], [], [])
else:
r, w, e = select.select([self.fd], [], [], timeout/1000)
r, w, e = select.select([self.fd], [], [], timeout / 1000)
return r

poll = MinimalPoll # type: ignore[assignment]
Expand Down Expand Up @@ -164,8 +168,15 @@ def __init__(
and os.getenv("TERM_PROGRAM") == "Apple_Terminal"
)

try:
self.__input_fd_set(tcgetattr(self.input_fd), ignore=frozenset())
except _error as e:
raise RuntimeError(f"termios failure ({e.args[1]})")

@overload
def _my_getstr(cap: str, optional: Literal[False] = False) -> bytes: ...
def _my_getstr(
cap: str, optional: Literal[False] = False
) -> bytes: ...

@overload
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
Expand Down Expand Up @@ -205,7 +216,9 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:

self.__setup_movement()

self.event_queue = EventQueue(self.input_fd, self.encoding, self.terminfo)
self.event_queue = EventQueue(
self.input_fd, self.encoding, self.terminfo
)
self.cursor_visible = 1

signal.signal(signal.SIGCONT, self._sigcont_handler)
Expand All @@ -217,7 +230,6 @@ def _sigcont_handler(self, signum, frame):
def __read(self, n: int) -> bytes:
return os.read(self.input_fd, n)


def change_encoding(self, encoding: str) -> None:
"""
Change the encoding used for I/O operations.
Expand Down Expand Up @@ -329,6 +341,8 @@ def prepare(self):
"""
Prepare the console for input/output operations.
"""
self.__buffer = []

self.__svtermstate = tcgetattr(self.input_fd)
raw = self.__svtermstate.copy()
raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON)
Expand All @@ -340,14 +354,7 @@ def prepare(self):
raw.lflag |= termios.ISIG
raw.cc[termios.VMIN] = 1
raw.cc[termios.VTIME] = 0
try:
tcsetattr(self.input_fd, termios.TCSADRAIN, raw)
except termios.error as e:
if e.args[0] != errno.EIO:
# gh-135329: when running under external programs (like strace),
# tcsetattr may fail with EIO. We can safely ignore this
# and continue with default terminal settings.
raise
self.__input_fd_set(raw)

# In macOS terminal we need to deactivate line wrap via ANSI escape code
if self.is_apple_terminal:
Expand All @@ -356,8 +363,6 @@ def prepare(self):
self.screen = []
self.height, self.width = self.getheightwidth()

self.__buffer = []

self.posxy = 0, 0
self.__gone_tall = 0
self.__move = self.__move_short
Expand All @@ -379,11 +384,7 @@ def restore(self):
self.__disable_bracketed_paste()
self.__maybe_write_code(self._rmkx)
self.flushoutput()
try:
tcsetattr(self.input_fd, termios.TCSADRAIN, self.__svtermstate)
except termios.error as e:
if e.args[0] != errno.EIO:
raise
self.__input_fd_set(self.__svtermstate)

if self.is_apple_terminal:
os.write(self.output_fd, b"\033[?7h")
Expand Down Expand Up @@ -820,3 +821,17 @@ def __tputs(self, fmt, prog=delayprog):
os.write(self.output_fd, self._pad * nchars)
else:
time.sleep(float(delay) / 1000.0)

def __input_fd_set(
self,
state: TermState,
ignore: AbstractSet[int] = _error_codes_to_ignore,
) -> bool:
try:
tcsetattr(self.input_fd, termios.TCSADRAIN, state)
except termios.error as te:
if te.args[0] not in ignore:
raise
return False
else:
return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Don't run PyREPL in a degraded environment where setting termios attributes
is not allowed.
Loading