Skip to content

Commit

Permalink
Don't let error display be part of support library.
Browse files Browse the repository at this point in the history
  • Loading branch information
rickardlindberg committed Apr 30, 2020
1 parent 24448fc commit b24d5cc
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 67 deletions.
12 changes: 11 additions & 1 deletion writing/rlmeta-vm-poster/compile.sh
Expand Up @@ -17,6 +17,7 @@ codegenerator_py=$(python "$rlmeta_compiler" < codegenerator.rlmeta)

cat <<EOF
import sys
import pprint
SUPPORT = $support_py_string
Expand All @@ -38,5 +39,14 @@ if __name__ == "__main__":
)
)
except _MatchError as e:
sys.exit(e.describe())
MARKER = "\033[0;31m<ERROR POSITION>\033[0m"
if isinstance(e.stream, basestring):
stream_string = e.stream[:e.pos] + MARKER + e.stream[e.pos:]
else:
stream_string = pprint.pformat(e.stream)
sys.exit("ERROR: {}\nPOSITION: {}\nSTREAM:\n{}".format(
e.message,
e.pos,
indent(stream_string)
))
EOF
47 changes: 13 additions & 34 deletions writing/rlmeta-vm-poster/rlmeta.py
@@ -1,6 +1,7 @@
import sys
import pprint

SUPPORT = 'def rlmeta_vm(instructions, labels, start_rule, stream):\n label_counter = 0\n last_action = _ConstantSemanticAction(None)\n pc = labels[start_rule]\n call_backtrack_stack = []\n stream, pos, stream_pos_stack = (stream, 0, [])\n scope, scope_stack = (None, [])\n fail_message = None\n latest_fail_message, latest_fail_pos = (None, tuple())\n memo = {}\n while True:\n name, arg1, arg2 = instructions[pc]\n if name == "PUSH_SCOPE":\n scope_stack.append(scope)\n scope = {}\n pc += 1\n continue\n elif name == "BACKTRACK":\n call_backtrack_stack.append((labels[arg1], pos, len(stream_pos_stack), len(scope_stack)))\n pc += 1\n continue\n elif name == "CALL":\n key = (arg1, tuple([x[1] for x in stream_pos_stack]+[pos]))\n if key in memo:\n last_action, stream_pos_stack = memo[key]\n stream_pos_stack = stream_pos_stack[:]\n stream, pos = stream_pos_stack.pop()\n pc += 1\n else:\n call_backtrack_stack.append((pc+1, key))\n pc = labels[arg1]\n continue\n elif name == "MATCH_CHARSEQ":\n for char in arg1:\n if pos >= len(stream) or stream[pos] != char:\n fail_message = ("expected {!r}", char)\n break\n pos += 1\n else:\n last_action = _ConstantSemanticAction(arg1)\n pc += 1\n continue\n elif name == "COMMIT":\n call_backtrack_stack.pop()\n pc = labels[arg1]\n continue\n elif name == "POP_SCOPE":\n scope = scope_stack.pop()\n pc += 1\n continue\n elif name == "RETURN":\n if len(call_backtrack_stack) == 0:\n return last_action.eval()\n pc, key = call_backtrack_stack.pop()\n memo[key] = (last_action, stream_pos_stack+[(stream, pos)])\n continue\n elif name == "LIST_APPEND":\n scope.append(last_action)\n pc += 1\n continue\n elif name == "BIND":\n scope[arg1] = last_action\n pc += 1\n continue\n elif name == "ACTION":\n last_action = _UserSemanticAction(arg1, scope)\n pc += 1\n continue\n elif name == "MATCH_RANGE":\n if pos >= len(stream) or not (arg1 <= stream[pos] <= arg2):\n fail_message = ("expected range {!r}-{!r}", arg1, arg2)\n else:\n last_action = _ConstantSemanticAction(stream[pos])\n pos += 1\n pc += 1\n continue\n elif name == "LIST_START":\n scope_stack.append(scope)\n scope = []\n pc += 1\n continue\n elif name == "LIST_END":\n last_action = _UserSemanticAction(lambda xs: [x.eval() for x in xs], scope)\n scope = scope_stack.pop()\n pc += 1\n continue\n elif name == "MATCH_ANY":\n if pos >= len(stream):\n fail_message = ("expected any",)\n else:\n last_action = _ConstantSemanticAction(stream[pos])\n pos += 1\n pc += 1\n continue\n elif name == "PUSH_STREAM":\n if pos >= len(stream) or not isinstance(stream[pos], list):\n fail_message = ("expected list",)\n else:\n stream_pos_stack.append((stream, pos))\n stream = stream[pos]\n pos = 0\n pc += 1\n continue\n elif name == "POP_STREAM":\n if pos < len(stream):\n fail_message = ("expected end of list",)\n else:\n stream, pos = stream_pos_stack.pop()\n pos += 1\n pc += 1\n continue\n elif name == "MATCH_CALL_RULE":\n if pos >= len(stream):\n fail_message = ("expected any",)\n else:\n fn_name = str(stream[pos])\n key = (fn_name, tuple([x[1] for x in stream_pos_stack]+[pos]))\n if key in memo:\n last_action, stream_pos_stack = memo[key]\n stream_pos_stack = stream_pos_stack[:]\n stream, pos = stream_pos_stack.pop()\n pc += 1\n else:\n call_backtrack_stack.append((pc+1, key))\n pc = labels[fn_name]\n pos += 1\n continue\n elif name == "FAIL":\n fail_message = (arg1,)\n elif name == "LABEL":\n last_action = _ConstantSemanticAction(label_counter)\n label_counter += 1\n pc += 1\n continue\n elif name == "MATCH_STRING":\n if pos >= len(stream) or stream[pos] != arg1:\n fail_message = ("expected {!r}", arg1)\n else:\n last_action = _ConstantSemanticAction(arg1)\n pos += 1\n pc += 1\n continue\n else:\n raise Exception("unknown instruction {}".format(name))\n fail_pos = tuple([x[1] for x in stream_pos_stack]+[pos])\n if fail_pos >= latest_fail_pos:\n latest_fail_message = fail_message\n latest_fail_pos = fail_pos\n call_backtrack_entry = tuple()\n while call_backtrack_stack:\n call_backtrack_entry = call_backtrack_stack.pop()\n if len(call_backtrack_entry) == 4:\n break\n if len(call_backtrack_entry) != 4:\n fail_pos = list(latest_fail_pos)\n fail_stream = stream_pos_stack[0][0] if stream_pos_stack else stream\n while len(fail_pos) > 1:\n fail_stream = fail_stream[fail_pos.pop(0)]\n raise _MatchError(latest_fail_message, fail_pos[0], fail_stream)\n (pc, pos, stream_stack_len, scope_stack_len) = call_backtrack_entry\n if len(stream_pos_stack) > stream_stack_len:\n stream = stream_pos_stack[stream_stack_len][0]\n stream_pos_stack = stream_pos_stack[:stream_stack_len]\n if len(scope_stack) > scope_stack_len:\n scope = scope_stack[scope_stack_len]\n scope_stack = scope_stack[:scope_stack_len]\n\nclass _Grammar(object):\n\n def run(self, rule_name, stream):\n return rlmeta_vm(self._instructions, self._labels, rule_name, stream)\n\nclass _ConstantSemanticAction(object):\n\n def __init__(self, value):\n self.value = value\n\n def eval(self):\n return self.value\n\nclass _UserSemanticAction(object):\n\n def __init__(self, fn, scope):\n self.fn = fn\n self.scope = scope\n\n def eval(self):\n return self.fn(self.scope)\n\nclass _MatchError(Exception):\n\n def __init__(self, message, pos, stream):\n Exception.__init__(self)\n self.message = message\n self.pos = pos\n self.stream = stream\n\n def describe(self):\n message = ""\n if isinstance(self.stream, basestring):\n before = self.stream[:self.pos].splitlines()\n after = self.stream[self.pos:].splitlines()\n for context_before in before[-4:-1]:\n message += self._context(context_before)\n message += self._context(before[-1], after[0])\n message += self._arrow(len(before[-1]))\n for context_after in after[1:4]:\n message += self._context(context_after)\n else:\n message += self._context("[")\n for context_before in self.stream[:self.pos]:\n message += self._context(" ", repr(context_before), ",")\n message += self._context(" ", repr(self.stream[self.pos]), ",")\n message += self._arrow(2)\n for context_after in self.stream[self.pos+1:]:\n message += self._context(" ", repr(context_after), ",")\n message += self._context("]")\n message += "Error: "\n message += self.message[0].format(*self.message[1:])\n message += "\\n"\n return message\n\n def _context(self, *args):\n return "> {}\\n".format("".join(args))\n\n def _arrow(self, lenght):\n return "--{}^\\n".format("-"*lenght)\n\ndef join(items):\n return "".join(\n join(item) if isinstance(item, list) else str(item)\n for item in items\n )\n\ndef indent(text):\n return join(join([" ", x]) for x in text.splitlines(True))\n'
SUPPORT = 'def rlmeta_vm(instructions, labels, start_rule, stream):\n label_counter = 0\n last_action = _ConstantSemanticAction(None)\n pc = labels[start_rule]\n call_backtrack_stack = []\n stream, pos, stream_pos_stack = (stream, 0, [])\n scope, scope_stack = (None, [])\n fail_message = None\n latest_fail_message, latest_fail_pos = (None, tuple())\n memo = {}\n while True:\n name, arg1, arg2 = instructions[pc]\n if name == "PUSH_SCOPE":\n scope_stack.append(scope)\n scope = {}\n pc += 1\n continue\n elif name == "BACKTRACK":\n call_backtrack_stack.append((labels[arg1], pos, len(stream_pos_stack), len(scope_stack)))\n pc += 1\n continue\n elif name == "CALL":\n key = (arg1, tuple([x[1] for x in stream_pos_stack]+[pos]))\n if key in memo:\n last_action, stream_pos_stack = memo[key]\n stream_pos_stack = stream_pos_stack[:]\n stream, pos = stream_pos_stack.pop()\n pc += 1\n else:\n call_backtrack_stack.append((pc+1, key))\n pc = labels[arg1]\n continue\n elif name == "MATCH_CHARSEQ":\n for char in arg1:\n if pos >= len(stream) or stream[pos] != char:\n fail_message = ("expected {!r}", char)\n break\n pos += 1\n else:\n last_action = _ConstantSemanticAction(arg1)\n pc += 1\n continue\n elif name == "COMMIT":\n call_backtrack_stack.pop()\n pc = labels[arg1]\n continue\n elif name == "POP_SCOPE":\n scope = scope_stack.pop()\n pc += 1\n continue\n elif name == "RETURN":\n if len(call_backtrack_stack) == 0:\n return last_action.eval()\n pc, key = call_backtrack_stack.pop()\n memo[key] = (last_action, stream_pos_stack+[(stream, pos)])\n continue\n elif name == "LIST_APPEND":\n scope.append(last_action)\n pc += 1\n continue\n elif name == "BIND":\n scope[arg1] = last_action\n pc += 1\n continue\n elif name == "ACTION":\n last_action = _UserSemanticAction(arg1, scope)\n pc += 1\n continue\n elif name == "MATCH_RANGE":\n if pos >= len(stream) or not (arg1 <= stream[pos] <= arg2):\n fail_message = ("expected range {!r}-{!r}", arg1, arg2)\n else:\n last_action = _ConstantSemanticAction(stream[pos])\n pos += 1\n pc += 1\n continue\n elif name == "LIST_START":\n scope_stack.append(scope)\n scope = []\n pc += 1\n continue\n elif name == "LIST_END":\n last_action = _UserSemanticAction(lambda xs: [x.eval() for x in xs], scope)\n scope = scope_stack.pop()\n pc += 1\n continue\n elif name == "MATCH_ANY":\n if pos >= len(stream):\n fail_message = ("expected any",)\n else:\n last_action = _ConstantSemanticAction(stream[pos])\n pos += 1\n pc += 1\n continue\n elif name == "PUSH_STREAM":\n if pos >= len(stream) or not isinstance(stream[pos], list):\n fail_message = ("expected list",)\n else:\n stream_pos_stack.append((stream, pos))\n stream = stream[pos]\n pos = 0\n pc += 1\n continue\n elif name == "POP_STREAM":\n if pos < len(stream):\n fail_message = ("expected end of list",)\n else:\n stream, pos = stream_pos_stack.pop()\n pos += 1\n pc += 1\n continue\n elif name == "MATCH_CALL_RULE":\n if pos >= len(stream):\n fail_message = ("expected any",)\n else:\n fn_name = str(stream[pos])\n key = (fn_name, tuple([x[1] for x in stream_pos_stack]+[pos]))\n if key in memo:\n last_action, stream_pos_stack = memo[key]\n stream_pos_stack = stream_pos_stack[:]\n stream, pos = stream_pos_stack.pop()\n pc += 1\n else:\n call_backtrack_stack.append((pc+1, key))\n pc = labels[fn_name]\n pos += 1\n continue\n elif name == "FAIL":\n fail_message = (arg1,)\n elif name == "LABEL":\n last_action = _ConstantSemanticAction(label_counter)\n label_counter += 1\n pc += 1\n continue\n elif name == "MATCH_STRING":\n if pos >= len(stream) or stream[pos] != arg1:\n fail_message = ("expected {!r}", arg1)\n else:\n last_action = _ConstantSemanticAction(arg1)\n pos += 1\n pc += 1\n continue\n else:\n raise Exception("unknown instruction {}".format(name))\n fail_pos = tuple([x[1] for x in stream_pos_stack]+[pos])\n if fail_pos >= latest_fail_pos:\n latest_fail_message = fail_message\n latest_fail_pos = fail_pos\n call_backtrack_entry = tuple()\n while call_backtrack_stack:\n call_backtrack_entry = call_backtrack_stack.pop()\n if len(call_backtrack_entry) == 4:\n break\n if len(call_backtrack_entry) != 4:\n fail_pos = list(latest_fail_pos)\n fail_stream = stream_pos_stack[0][0] if stream_pos_stack else stream\n while len(fail_pos) > 1:\n fail_stream = fail_stream[fail_pos.pop(0)]\n raise _MatchError(latest_fail_message, fail_pos[0], fail_stream)\n (pc, pos, stream_stack_len, scope_stack_len) = call_backtrack_entry\n if len(stream_pos_stack) > stream_stack_len:\n stream = stream_pos_stack[stream_stack_len][0]\n stream_pos_stack = stream_pos_stack[:stream_stack_len]\n if len(scope_stack) > scope_stack_len:\n scope = scope_stack[scope_stack_len]\n scope_stack = scope_stack[:scope_stack_len]\n\nclass _Grammar(object):\n\n def run(self, rule_name, stream):\n return rlmeta_vm(self._instructions, self._labels, rule_name, stream)\n\nclass _ConstantSemanticAction(object):\n\n def __init__(self, value):\n self.value = value\n\n def eval(self):\n return self.value\n\nclass _UserSemanticAction(object):\n\n def __init__(self, fn, scope):\n self.fn = fn\n self.scope = scope\n\n def eval(self):\n return self.fn(self.scope)\n\nclass _MatchError(Exception):\n\n def __init__(self, message, pos, stream):\n Exception.__init__(self)\n self.message = message[0].format(*message[1:])\n self.pos = pos\n self.stream = stream\n\ndef join(items):\n return "".join(\n join(item) if isinstance(item, list) else str(item)\n for item in items\n )\n\ndef indent(text):\n return join(join([" ", x]) for x in text.splitlines(True))\n'

def rlmeta_vm(instructions, labels, start_rule, stream):
label_counter = 0
Expand Down Expand Up @@ -195,41 +196,10 @@ class _MatchError(Exception):

def __init__(self, message, pos, stream):
Exception.__init__(self)
self.message = message
self.message = message[0].format(*message[1:])
self.pos = pos
self.stream = stream

def describe(self):
message = ""
if isinstance(self.stream, basestring):
before = self.stream[:self.pos].splitlines()
after = self.stream[self.pos:].splitlines()
for context_before in before[-4:-1]:
message += self._context(context_before)
message += self._context(before[-1], after[0])
message += self._arrow(len(before[-1]))
for context_after in after[1:4]:
message += self._context(context_after)
else:
message += self._context("[")
for context_before in self.stream[:self.pos]:
message += self._context(" ", repr(context_before), ",")
message += self._context(" ", repr(self.stream[self.pos]), ",")
message += self._arrow(2)
for context_after in self.stream[self.pos+1:]:
message += self._context(" ", repr(context_after), ",")
message += self._context("]")
message += "Error: "
message += self.message[0].format(*self.message[1:])
message += "\n"
return message

def _context(self, *args):
return "> {}\n".format("".join(args))

def _arrow(self, lenght):
return "--{}^\n".format("-"*lenght)

def join(items):
return "".join(
join(item) if isinstance(item, list) else str(item)
Expand Down Expand Up @@ -1090,4 +1060,13 @@ def LABEL(name):
)
)
except _MatchError as e:
sys.exit(e.describe())
MARKER = "\033[0;31m<ERROR POSITION>\033[0m"
if isinstance(e.stream, basestring):
stream_string = e.stream[:e.pos] + MARKER + e.stream[e.pos:]
else:
stream_string = pprint.pformat(e.stream)
sys.exit("ERROR: {}\nPOSITION: {}\nSTREAM:\n{}".format(
e.message,
e.pos,
indent(stream_string)
))

0 comments on commit b24d5cc

Please sign in to comment.