Skip to content

Commit

Permalink
Added PlainTextOutput: an output that doesn't write ANSI escape seque…
Browse files Browse the repository at this point in the history
…nces to the file.
  • Loading branch information
jonathanslenders committed Feb 3, 2022
1 parent 57b42c4 commit 402b6a3
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 76 deletions.
84 changes: 84 additions & 0 deletions prompt_toolkit/output/flush_stdout.py
@@ -0,0 +1,84 @@
import errno
import os
import sys
from contextlib import contextmanager
from typing import IO, Iterator, TextIO, cast

__all__ = ["flush_stdout"]


def flush_stdout(stdout: TextIO, data: str, write_binary: bool) -> None:
try:
# Ensure that `stdout` is made blocking when writing into it.
# Otherwise, when uvloop is activated (which makes stdout
# non-blocking), and we write big amounts of text, then we get a
# `BlockingIOError` here.
with _blocking_io(stdout):
# (We try to encode ourself, because that way we can replace
# characters that don't exist in the character set, avoiding
# UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.)
# My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968'
# for sys.stdout.encoding in xterm.
out: IO[bytes]
if write_binary:
if hasattr(stdout, "buffer"):
out = stdout.buffer
else:
# IO[bytes] was given to begin with.
# (Used in the unit tests, for instance.)
out = cast(IO[bytes], stdout)
out.write(data.encode(stdout.encoding or "utf-8", "replace"))
else:
stdout.write(data)

stdout.flush()
except IOError as e:
if e.args and e.args[0] == errno.EINTR:
# Interrupted system call. Can happen in case of a window
# resize signal. (Just ignore. The resize handler will render
# again anyway.)
pass
elif e.args and e.args[0] == 0:
# This can happen when there is a lot of output and the user
# sends a KeyboardInterrupt by pressing Control-C. E.g. in
# a Python REPL when we execute "while True: print('test')".
# (The `ptpython` REPL uses this `Output` class instead of
# `stdout` directly -- in order to be network transparent.)
# So, just ignore.
pass
else:
raise


@contextmanager
def _blocking_io(io: IO[str]) -> Iterator[None]:
"""
Ensure that the FD for `io` is set to blocking in here.
"""
if sys.platform == "win32":
# On Windows, the `os` module doesn't have a `get/set_blocking`
# function.
yield
return

try:
fd = io.fileno()
blocking = os.get_blocking(fd)
except: # noqa
# Failed somewhere.
# `get_blocking` can raise `OSError`.
# The io object can raise `AttributeError` when no `fileno()` method is
# present if we're not a real file object.
blocking = True # Assume we're good, and don't do anything.

try:
# Make blocking if we weren't blocking yet.
if not blocking:
os.set_blocking(fd, True)

yield

finally:
# Restore original blocking mode.
if not blocking:
os.set_blocking(fd, blocking)
138 changes: 138 additions & 0 deletions prompt_toolkit/output/plain_text.py
@@ -0,0 +1,138 @@
from typing import List, TextIO

from prompt_toolkit.data_structures import Size
from prompt_toolkit.styles import Attrs

from .base import Output
from .color_depth import ColorDepth
from .flush_stdout import flush_stdout

__all__ = ["PlainTextOutput"]


class PlainTextOutput(Output):
"""
Output that won't include any ANSI escape sequences.
Useful when stdout is not a terminal. Maybe stdout is redirected to a file.
In this case, if `print_formatted_text` is used, for instance, we don't
want to include formatting.
(The code is mostly identical to `Vt100_Output`, but without the
formatting.)
"""

def __init__(self, stdout: TextIO, write_binary: bool = True) -> None:
assert all(hasattr(stdout, a) for a in ("write", "flush"))

if write_binary:
assert hasattr(stdout, "encoding")

self.stdout: TextIO = stdout
self.write_binary = write_binary
self._buffer: List[str] = []

def fileno(self) -> int:
"There is no sensible default for fileno()."
return self.stdout.fileno()

def encoding(self) -> str:
return "utf-8"

def write(self, data: str) -> None:
self._buffer.append(data)

def write_raw(self, data: str) -> None:
self._buffer.append(data)

def set_title(self, title: str) -> None:
pass

def clear_title(self) -> None:
pass

def flush(self) -> None:
if not self._buffer:
return

data = "".join(self._buffer)
self._buffer = []
flush_stdout(self.stdout, data, write_binary=self.write_binary)

def erase_screen(self) -> None:
pass

def enter_alternate_screen(self) -> None:
pass

def quit_alternate_screen(self) -> None:
pass

def enable_mouse_support(self) -> None:
pass

def disable_mouse_support(self) -> None:
pass

def erase_end_of_line(self) -> None:
pass

def erase_down(self) -> None:
pass

def reset_attributes(self) -> None:
pass

def set_attributes(self, attrs: Attrs, color_depth: ColorDepth) -> None:
pass

def disable_autowrap(self) -> None:
pass

def enable_autowrap(self) -> None:
pass

def cursor_goto(self, row: int = 0, column: int = 0) -> None:
pass

def cursor_up(self, amount: int) -> None:
pass

def cursor_down(self, amount: int) -> None:
self._buffer.append("\n")

def cursor_forward(self, amount: int) -> None:
self._buffer.append(" " * amount)

def cursor_backward(self, amount: int) -> None:
pass

def hide_cursor(self) -> None:
pass

def show_cursor(self) -> None:
pass

def ask_for_cpr(self) -> None:
pass

def bell(self) -> None:
pass

def enable_bracketed_paste(self) -> None:
pass

def disable_bracketed_paste(self) -> None:
pass

def scroll_buffer_to_prompt(self) -> None:
pass

def get_size(self) -> Size:
return Size(rows=40, columns=80)

def get_rows_below_cursor_position(self) -> int:
return 8

def get_default_color_depth(self) -> ColorDepth:
return ColorDepth.DEPTH_1_BIT
78 changes: 2 additions & 76 deletions prompt_toolkit/output/vt100.py
Expand Up @@ -7,11 +7,9 @@
http://pygments.org/
"""
import array
import errno
import io
import os
import sys
from contextlib import contextmanager
from typing import (
IO,
Callable,
Expand All @@ -34,6 +32,7 @@
from prompt_toolkit.utils import is_dumb_terminal

from .color_depth import ColorDepth
from .flush_stdout import flush_stdout

__all__ = [
"Vt100_Output",
Expand Down Expand Up @@ -673,46 +672,7 @@ def flush(self) -> None:
data = "".join(self._buffer)
self._buffer = []

try:
# Ensure that `self.stdout` is made blocking when writing into it.
# Otherwise, when uvloop is activated (which makes stdout
# non-blocking), and we write big amounts of text, then we get a
# `BlockingIOError` here.
with blocking_io(self.stdout):
# (We try to encode ourself, because that way we can replace
# characters that don't exist in the character set, avoiding
# UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.)
# My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968'
# for sys.stdout.encoding in xterm.
out: IO[bytes]
if self.write_binary:
if hasattr(self.stdout, "buffer"):
out = self.stdout.buffer
else:
# IO[bytes] was given to begin with.
# (Used in the unit tests, for instance.)
out = cast(IO[bytes], self.stdout)
out.write(data.encode(self.stdout.encoding or "utf-8", "replace"))
else:
self.stdout.write(data)

self.stdout.flush()
except IOError as e:
if e.args and e.args[0] == errno.EINTR:
# Interrupted system call. Can happen in case of a window
# resize signal. (Just ignore. The resize handler will render
# again anyway.)
pass
elif e.args and e.args[0] == 0:
# This can happen when there is a lot of output and the user
# sends a KeyboardInterrupt by pressing Control-C. E.g. in
# a Python REPL when we execute "while True: print('test')".
# (The `ptpython` REPL uses this `Output` class instead of
# `stdout` directly -- in order to be network transparent.)
# So, just ignore.
pass
else:
raise
flush_stdout(self.stdout, data, write_binary=self.write_binary)

def ask_for_cpr(self) -> None:
"""
Expand Down Expand Up @@ -764,37 +724,3 @@ def get_default_color_depth(self) -> ColorDepth:
return ColorDepth.DEPTH_4_BIT

return ColorDepth.DEFAULT


@contextmanager
def blocking_io(io: IO[str]) -> Iterator[None]:
"""
Ensure that the FD for `io` is set to blocking in here.
"""
if sys.platform == "win32":
# On Windows, the `os` module doesn't have a `get/set_blocking`
# function.
yield
return

try:
fd = io.fileno()
blocking = os.get_blocking(fd)
except: # noqa
# Failed somewhere.
# `get_blocking` can raise `OSError`.
# The io object can raise `AttributeError` when no `fileno()` method is
# present if we're not a real file object.
blocking = True # Assume we're good, and don't do anything.

try:
# Make blocking if we weren't blocking yet.
if not blocking:
os.set_blocking(fd, True)

yield

finally:
# Restore original blocking mode.
if not blocking:
os.set_blocking(fd, blocking)

0 comments on commit 402b6a3

Please sign in to comment.