Skip to content

Commit

Permalink
Use readline for the debugger
Browse files Browse the repository at this point in the history
  • Loading branch information
ESultanik committed Jan 20, 2022
1 parent 84972c4 commit ce7d630
Showing 1 changed file with 100 additions and 26 deletions.
126 changes: 100 additions & 26 deletions polyfile/debugger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from abc import ABC, abstractmethod
import atexit
from enum import Enum
from io import StringIO
from pathlib import Path
from pdb import Pdb
import readline
import sys
from typing import Any, Callable, ContextManager, Generic, Iterable, Iterator, List, Optional, Type, TypeVar, Union

Expand All @@ -16,6 +19,9 @@
log = getStatusLogger("polyfile")


HISTORY_PATH = Path.home() / ".polyfile_history"


class ANSIColor(Enum):
BLACK = 30
RED = 31
Expand Down Expand Up @@ -398,6 +404,51 @@ def value(self, new_value):
self.debugger._instrument()


class ANSIWriter:
def __init__(self, use_ansi: bool = True, escape_for_readline: bool = False):
self.use_ansi: bool = use_ansi
self.escape_for_readline: bool = escape_for_readline
self.data = StringIO()

@staticmethod
def format(
message: Any, bold: bool = False, dim: bool = False, color: Optional[ANSIColor] = None,
escape_for_readline: bool = False
) -> str:
prefixes: List[str] = []
if bold and not dim:
prefixes.append("\u001b[1m")
elif dim and not bold:
prefixes.append("\u001b[2m")
if color is not None:
prefixes.append(color.to_code())
if prefixes:
if escape_for_readline:
message = f"\001{''.join(prefixes)}\002{message!s}\001\u001b[0m\002"
else:
message = f"{''.join(prefixes)}{message!s}\u001b[0m"
else:
message = str(message)
return message

def write(self, message: Any, bold: bool = False, dim: bool = False, color: Optional[ANSIColor] = None,
escape_for_readline: Optional[bool] = None) -> str:
if self.use_ansi:
if escape_for_readline is None:
escape_for_readline = self.escape_for_readline
self.data.write(self.format(message=message, bold=bold, dim=dim, color=color,
escape_for_readline=escape_for_readline))
else:
self.data.write(str(message))

def __str__(self):
return self.data.getvalue()


def _disable(debugger: "Debugger"):
debugger.enabled = False


class Debugger(ContextManager["Debugger"]):
def __init__(self, break_on_parsing: bool = True):
self.instrumented_tests: List[InstrumentedTest] = []
Expand All @@ -411,6 +462,10 @@ def __init__(self, break_on_parsing: bool = True):
self.last_offset: int = 0
self.last_result: Optional[TestResult] = None
self.repl_test: Optional[MagicTest] = None
if sys.stderr.isatty():
self.repl_prompt: str = ANSIWriter.format("(polyfile) ", bold=True, escape_for_readline=True)
else:
self.repl_prompt = "(polyfile) "
self.instrumented_parsers: List[InstrumentedParser] = []
self.break_on_submatching: BreakOnSubmatching = BreakOnSubmatching(break_on_parsing, self)
self.variables_by_name: Dict[str, Variable] = {
Expand All @@ -421,6 +476,8 @@ def __init__(self, break_on_parsing: bool = True):
" disable from the command line with `--no-debug-python`)"
}
self._pdb: Optional[Pdb] = None
self._prev_history_length: int = 0
atexit.register(_disable, self)

def save_context(self):
class DebugContext:
Expand Down Expand Up @@ -463,12 +520,29 @@ def _instrument(self):

@enabled.setter
def enabled(self, is_enabled: bool):
was_enabled = self.enabled
self._uninstrument()
if is_enabled:
self._instrument()
try:
readline.read_history_file(HISTORY_PATH)
self._prev_history_length = readline.get_current_history_length()
except FileNotFoundError:
open(HISTORY_PATH, 'wb').close()
self._prev_history_length = 0
# default history len is -1 (infinite), which may grow unruly
readline.set_history_length(2048)
self.write(f"PolyFile {__version__}\n", color=ANSIColor.MAGENTA, bold=True)
self.write(f"{__copyright__}\n{__license__}\n\nFor help, type \"help\".\n")
self.repl()
elif was_enabled:
# we are now disabled, so store our history
new_length = readline.get_current_history_length()
try:
readline.append_history_file(max(new_length - self._prev_history_length, 0), HISTORY_PATH)
self._prev_history_length = readline.get_current_history_length()
except IOError as e:
log.warning(f"Unable to save history to {HISTORY_PATH!s}: {e!s}")

def __enter__(self) -> "Debugger":
self._entries += 1
Expand Down Expand Up @@ -528,30 +602,25 @@ def write(self, message: Any, bold: bool = False, dim: bool = False, color: Opti
if isinstance(message, MagicTest):
self.write_test(message)
return
prefixes: List[str] = []
if bold and not dim:
prefixes.append("\u001b[1m")
elif dim and not bold:
prefixes.append("\u001b[2m")
if color is not None:
prefixes.append(color.to_code())
if prefixes:
sys.stdout.write(f"{''.join(prefixes)}{message!s}\u001b[0m")
return
message = ANSIWriter.format(message=message, bold=bold, dim=dim, color=color)
sys.stdout.write(str(message))

def prompt(self, message: str, default: bool = True) -> bool:
while True:
self.write(f"{message} ", bold=True)
self.write("[", dim=True)
buffer = ANSIWriter(use_ansi=sys.stderr.isatty(), escape_for_readline=True)
buffer.write(f"{message} ", bold=True)
buffer.write("[", dim=True)
if default:
self.write("Y", bold=True, color=ANSIColor.GREEN)
self.write("n", dim=True, color=ANSIColor.RED)
buffer.write("Y", bold=True, color=ANSIColor.GREEN)
buffer.write("n", dim=True, color=ANSIColor.RED)
else:
self.write("y", dim=True, color=ANSIColor.GREEN)
self.write("N", bold=True, color=ANSIColor.RED)
self.write("] ", dim=True)
answer = input().strip().lower()
buffer.write("y", dim=True, color=ANSIColor.GREEN)
buffer.write("N", bold=True, color=ANSIColor.RED)
buffer.write("] ", dim=True)
try:
answer = input(str(buffer)).strip().lower()
except EOFError:
raise KeyboardInterrupt()
if not answer:
return default
elif answer == "n":
Expand Down Expand Up @@ -703,15 +772,22 @@ def print_location():
self.print_match(match)
print_location()
self.write(f"About to parse for submatches using {instrumented_parser.parser!s}.\n")
if not self.prompt("Debug using PDB? "
f"\u001b[2m(disable this prompt with {ANSIColor.BLUE.to_code()}set break_on_parsing False"
"\u001b[0m\u001b[2m)\u001b[0m",
default=False):
buffer = ANSIWriter(use_ansi=sys.stderr.isatty(), escape_for_readline=True)
buffer.write("Debug using PDB? ")
buffer.write("(disable this prompt with `", dim=True)
buffer.write("set ", color=ANSIColor.BLUE)
buffer.write("break_on_parsing ", color=ANSIColor.GREEN)
buffer.write("False", color=ANSIColor.CYAN)
buffer.write("`)", dim=True)
if not self.prompt(str(buffer), default=False):
yield from parse(file_stream, match)
return
try:
self._pdb = Pdb(skip=["polyfile.magic_debugger", "polyfile.magic"])
self._pdb.prompt = "\u001b[1m(polyfile-Pdb)\u001b[0m "
if sys.stderr.isatty():
self._pdb.prompt = "\001\u001b[1m\002(polyfile-Pdb)\001\u001b[0m\002 "
else:
self._pdb.prompt = "(polyfile-Pdb) "
generator = parse(file_stream, match)
while True:
try:
Expand Down Expand Up @@ -740,9 +816,7 @@ def repl(self):
self.print_where()
while True:
try:
self.write("(polyfile) ", bold=True)
sys.stdout.flush()
command = input()
command = input(self.repl_prompt)
except EOFError:
# the user pressed ^D to quit
exit(0)
Expand Down

0 comments on commit ce7d630

Please sign in to comment.