diff --git a/prompt_toolkit/output/flush_stdout.py b/prompt_toolkit/output/flush_stdout.py new file mode 100644 index 000000000..4adcbd109 --- /dev/null +++ b/prompt_toolkit/output/flush_stdout.py @@ -0,0 +1,84 @@ +import errno +import os +import sys +from contextlib import contextmanager +from typing import IO, Iterator, TextIO, cast + +__all__ = ["flush_stdout"] + + +def flush_stdout(stdout: TextIO, data: str, write_binary: bool) -> None: + try: + # Ensure that `stdout` is made blocking when writing into it. + # Otherwise, when uvloop is activated (which makes stdout + # non-blocking), and we write big amounts of text, then we get a + # `BlockingIOError` here. + with _blocking_io(stdout): + # (We try to encode ourself, because that way we can replace + # characters that don't exist in the character set, avoiding + # UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.) + # My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968' + # for sys.stdout.encoding in xterm. + out: IO[bytes] + if write_binary: + if hasattr(stdout, "buffer"): + out = stdout.buffer + else: + # IO[bytes] was given to begin with. + # (Used in the unit tests, for instance.) + out = cast(IO[bytes], stdout) + out.write(data.encode(stdout.encoding or "utf-8", "replace")) + else: + stdout.write(data) + + stdout.flush() + except IOError as e: + if e.args and e.args[0] == errno.EINTR: + # Interrupted system call. Can happen in case of a window + # resize signal. (Just ignore. The resize handler will render + # again anyway.) + pass + elif e.args and e.args[0] == 0: + # This can happen when there is a lot of output and the user + # sends a KeyboardInterrupt by pressing Control-C. E.g. in + # a Python REPL when we execute "while True: print('test')". + # (The `ptpython` REPL uses this `Output` class instead of + # `stdout` directly -- in order to be network transparent.) + # So, just ignore. + pass + else: + raise + + +@contextmanager +def _blocking_io(io: IO[str]) -> Iterator[None]: + """ + Ensure that the FD for `io` is set to blocking in here. + """ + if sys.platform == "win32": + # On Windows, the `os` module doesn't have a `get/set_blocking` + # function. + yield + return + + try: + fd = io.fileno() + blocking = os.get_blocking(fd) + except: # noqa + # Failed somewhere. + # `get_blocking` can raise `OSError`. + # The io object can raise `AttributeError` when no `fileno()` method is + # present if we're not a real file object. + blocking = True # Assume we're good, and don't do anything. + + try: + # Make blocking if we weren't blocking yet. + if not blocking: + os.set_blocking(fd, True) + + yield + + finally: + # Restore original blocking mode. + if not blocking: + os.set_blocking(fd, blocking) diff --git a/prompt_toolkit/output/plain_text.py b/prompt_toolkit/output/plain_text.py new file mode 100644 index 000000000..c37b0cb4b --- /dev/null +++ b/prompt_toolkit/output/plain_text.py @@ -0,0 +1,138 @@ +from typing import List, TextIO + +from prompt_toolkit.data_structures import Size +from prompt_toolkit.styles import Attrs + +from .base import Output +from .color_depth import ColorDepth +from .flush_stdout import flush_stdout + +__all__ = ["PlainTextOutput"] + + +class PlainTextOutput(Output): + """ + Output that won't include any ANSI escape sequences. + + Useful when stdout is not a terminal. Maybe stdout is redirected to a file. + In this case, if `print_formatted_text` is used, for instance, we don't + want to include formatting. + + (The code is mostly identical to `Vt100_Output`, but without the + formatting.) + """ + + def __init__(self, stdout: TextIO, write_binary: bool = True) -> None: + assert all(hasattr(stdout, a) for a in ("write", "flush")) + + if write_binary: + assert hasattr(stdout, "encoding") + + self.stdout: TextIO = stdout + self.write_binary = write_binary + self._buffer: List[str] = [] + + def fileno(self) -> int: + "There is no sensible default for fileno()." + return self.stdout.fileno() + + def encoding(self) -> str: + return "utf-8" + + def write(self, data: str) -> None: + self._buffer.append(data) + + def write_raw(self, data: str) -> None: + self._buffer.append(data) + + def set_title(self, title: str) -> None: + pass + + def clear_title(self) -> None: + pass + + def flush(self) -> None: + if not self._buffer: + return + + data = "".join(self._buffer) + self._buffer = [] + flush_stdout(self.stdout, data, write_binary=self.write_binary) + + def erase_screen(self) -> None: + pass + + def enter_alternate_screen(self) -> None: + pass + + def quit_alternate_screen(self) -> None: + pass + + def enable_mouse_support(self) -> None: + pass + + def disable_mouse_support(self) -> None: + pass + + def erase_end_of_line(self) -> None: + pass + + def erase_down(self) -> None: + pass + + def reset_attributes(self) -> None: + pass + + def set_attributes(self, attrs: Attrs, color_depth: ColorDepth) -> None: + pass + + def disable_autowrap(self) -> None: + pass + + def enable_autowrap(self) -> None: + pass + + def cursor_goto(self, row: int = 0, column: int = 0) -> None: + pass + + def cursor_up(self, amount: int) -> None: + pass + + def cursor_down(self, amount: int) -> None: + self._buffer.append("\n") + + def cursor_forward(self, amount: int) -> None: + self._buffer.append(" " * amount) + + def cursor_backward(self, amount: int) -> None: + pass + + def hide_cursor(self) -> None: + pass + + def show_cursor(self) -> None: + pass + + def ask_for_cpr(self) -> None: + pass + + def bell(self) -> None: + pass + + def enable_bracketed_paste(self) -> None: + pass + + def disable_bracketed_paste(self) -> None: + pass + + def scroll_buffer_to_prompt(self) -> None: + pass + + def get_size(self) -> Size: + return Size(rows=40, columns=80) + + def get_rows_below_cursor_position(self) -> int: + return 8 + + def get_default_color_depth(self) -> ColorDepth: + return ColorDepth.DEPTH_1_BIT diff --git a/prompt_toolkit/output/vt100.py b/prompt_toolkit/output/vt100.py index 686303fa7..8cf2720df 100644 --- a/prompt_toolkit/output/vt100.py +++ b/prompt_toolkit/output/vt100.py @@ -7,11 +7,9 @@ http://pygments.org/ """ import array -import errno import io import os import sys -from contextlib import contextmanager from typing import ( IO, Callable, @@ -34,6 +32,7 @@ from prompt_toolkit.utils import is_dumb_terminal from .color_depth import ColorDepth +from .flush_stdout import flush_stdout __all__ = [ "Vt100_Output", @@ -673,46 +672,7 @@ def flush(self) -> None: data = "".join(self._buffer) self._buffer = [] - try: - # Ensure that `self.stdout` is made blocking when writing into it. - # Otherwise, when uvloop is activated (which makes stdout - # non-blocking), and we write big amounts of text, then we get a - # `BlockingIOError` here. - with blocking_io(self.stdout): - # (We try to encode ourself, because that way we can replace - # characters that don't exist in the character set, avoiding - # UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.) - # My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968' - # for sys.stdout.encoding in xterm. - out: IO[bytes] - if self.write_binary: - if hasattr(self.stdout, "buffer"): - out = self.stdout.buffer - else: - # IO[bytes] was given to begin with. - # (Used in the unit tests, for instance.) - out = cast(IO[bytes], self.stdout) - out.write(data.encode(self.stdout.encoding or "utf-8", "replace")) - else: - self.stdout.write(data) - - self.stdout.flush() - except IOError as e: - if e.args and e.args[0] == errno.EINTR: - # Interrupted system call. Can happen in case of a window - # resize signal. (Just ignore. The resize handler will render - # again anyway.) - pass - elif e.args and e.args[0] == 0: - # This can happen when there is a lot of output and the user - # sends a KeyboardInterrupt by pressing Control-C. E.g. in - # a Python REPL when we execute "while True: print('test')". - # (The `ptpython` REPL uses this `Output` class instead of - # `stdout` directly -- in order to be network transparent.) - # So, just ignore. - pass - else: - raise + flush_stdout(self.stdout, data, write_binary=self.write_binary) def ask_for_cpr(self) -> None: """ @@ -764,37 +724,3 @@ def get_default_color_depth(self) -> ColorDepth: return ColorDepth.DEPTH_4_BIT return ColorDepth.DEFAULT - - -@contextmanager -def blocking_io(io: IO[str]) -> Iterator[None]: - """ - Ensure that the FD for `io` is set to blocking in here. - """ - if sys.platform == "win32": - # On Windows, the `os` module doesn't have a `get/set_blocking` - # function. - yield - return - - try: - fd = io.fileno() - blocking = os.get_blocking(fd) - except: # noqa - # Failed somewhere. - # `get_blocking` can raise `OSError`. - # The io object can raise `AttributeError` when no `fileno()` method is - # present if we're not a real file object. - blocking = True # Assume we're good, and don't do anything. - - try: - # Make blocking if we weren't blocking yet. - if not blocking: - os.set_blocking(fd, True) - - yield - - finally: - # Restore original blocking mode. - if not blocking: - os.set_blocking(fd, blocking)