In [1]:
#| default_exp earley
from nbdev import *
from nbdev.showdoc import *

# Earley Parser

> Parses it earley style

In [2]:
#| exporti
from pprint import pprint
from typing import NamedTuple
from functools import cache
from collections import deque

In [3]:
# | export 

class State(NamedTuple):
    symbol: str
    rule: tuple
    startidx: int
    position: int
    predecessor: object
    creator: object
       
    def __repr__(self):
        pos = self.position
        return f"{self.symbol:<3}: {str(self.rule[:pos]):<15} | {str(self.rule[pos:]):<30} {'idx('}{self.startidx}) {'Done' if self.is_complete() else ''})"
    
    def is_complete(self):
        return self.position == len(self.rule)
    
    def nextsym(self):
        return False if self.is_complete() else self.rule[self.position]

class Earley:
    def __init__(self, grammar):        
        # check grammar, which should consist of a tuple of tuples of strings
        # one production symbol can generate 1-n options, which again consists of 1-n strings
        assert all(isinstance(k, str) for k in grammar), 'a symbol should be a string'
        assert all(isinstance(v, tuple) for v in grammar.values()), 'a symbol should contain a tuple of 1 or more production rules'
        assert all(isinstance(option, tuple) for v in grammar.values() for option in v), 'rules in the grammar should be tuples'
        assert all(isinstance(ch, str) for v in grammar.values() for option in v for ch in option), 'symbols in the grammar should be strings'
        self.grammar = grammar

    def find(self, cols, ans, endstate=False, start = False):
        if start:
            for state in cols[-1]:
                if state.is_complete() and state.startidx == 0 and state.symbol == '0':
                    ans.appendleft(state)
        elif not isinstance(endstate, State):
            return ans
        else:
            ans.appendleft(endstate)
        cur = ans[0]          
        while isinstance(cur, State) and isinstance(cur.predecessor, State):
            if isinstance(cur, State):
                ans = self.find(cols, ans, cur.creator)
            ans.appendleft(cur.predecessor)
            cur = ans[0]
        
        return ans

    def printparse(self, ans):
        # currently only print the begin of a rule and items descendents idented
        ident = -2
        prev = None
        for idx, state in enumerate(ans):
            if state.position == 0:
                ident += 2
                print('.'*ident, state)
            
            if state.is_complete():
                ident -= 2
            prev = state
    
    def isvalid(self, cols, start):
        # checks if parsing a message led to conclusion that message can be generated with given grammar
        for state in cols[-1]:
            if state.is_complete() and state.startidx == 0 and state.symbol == start:
                return True #('valid')
        else:
            return False     
                       
    def parse(self, message, start, verbose=False, returncols = False):
        # checks if a message can be made from the start rule
        # parse has 3 helper functions, scan, predict and completer
        def scan(column, parsedsym, advancedstate):
            # we are scanning the states in a column to check if we can advance based on the parsed symbol
            advancedstates = []
            for state in column:
                if not state.is_complete() and state.nextsym() == parsedsym:
                    symbol, rule, startidx, position, predecessor, creator = state
                    position += 1
                    advancedstates.append(State(symbol, rule, startidx, position, state, advancedstate))
            if verbose: print('parsed symbol:', message[colidx-1], ':we have advances on', [s['symbol'] for s in advancedstates])
            return advancedstates

        @cache
        def predict(colidx, symbol, advancedstate):
            seen = {symbol} # otherwise you can have infinite loop when recursively adding
            toadd = {(symbol, advancedstate)}
            predicted = set()
            while toadd:
                cur, curstate = toadd.pop()
                for rule in self.grammar.get(cur, []):
                    newstate = State(cur, rule, colidx, 0, False, 'creator')
                    predicted.add(newstate)
                    if (recursivesymbol := newstate.rule[0]) not in seen:
                        toadd.add((recursivesymbol, newstate))
                        seen.add(recursivesymbol)
            return frozenset(predicted)
        
        def completer(cols, advancedstates):
            # check if any of the states are completed, if yes do scan again (loop untill nothing is completed)
            while advancedstates:
                advancedstate = advancedstates.pop()
                cols[colidx].add(advancedstate)
                if advancedstate.is_complete(): # do scan again, looking to advance other states based on the symbol of the completed rule
                    if verbose: print('completion:', advancedstate['symbol'])
                    advancedstates += scan(cols[advancedstate.startidx], advancedstate.symbol, advancedstate)
                else: # predict new states based on the next expected symbol in the rule
                    cols[colidx] |= predict(colidx, advancedstate.nextsym(), advancedstate)
                    
        cols = [set() for _ in range(len(message)+1)]
        cols[0] |= predict(0, start, 'starter')
        if verbose: pprint(cols[0])
        for colidx in range(1, len(message)+1): # we just populated the 0th col, now the first character has colidx 1
            if verbose: print(colidx, message[colidx-1], len(cols[colidx-1]))
            advancedstates = scan(cols[colidx-1], message[colidx-1], 'scanner')
            completer(cols, advancedstates)
        return cols if returncols else self.isvalid(cols, start)

In [52]:
# https://adventofcode.com/2020/day/19
rules, messages = open('cfg example.txt').read().split('\n\n')
grammar = {}

for rule in rules.split('\n'):
    num, makefrom = rule.split(': ')
    makefrom = makefrom.replace('"', '')
    makefrom = tuple(makefrom.split(' | '))
    makefrom = tuple(tuple(option.split()) for option in makefrom)
    grammar[num] = makefrom
    
messages = messages.split('\n')
s = '0'
ans = 0
ear = Earley(grammar)
for m in messages:
    ans += ear.parse(m,s)
# assert ans == 129
ans

2

In [58]:
from itertools import product
ans = 0
from aocutils.math import all_permutations

for option in all_permutations():
    ans += ear.parse(option, s)
ans

8

In [47]:
print('message', messages[0])
cols = ear.parse(messages[0],s, returncols=True)
ans = ear.find(cols, deque([]), endstate = False, start=True)
ear.printparse(ans)
for state in ans:
    print(state, state.is_complete())

message ababbb
 0  : ()              | ('4', '1', '5')                idx(0) )
.. 4  : ()              | ('a',)                         idx(0) )
.. 1  : ()              | ('3', '2')                     idx(1) )
.... 3  : ()              | ('5', '4')                     idx(1) )
...... 5  : ()              | ('b',)                         idx(1) )
...... 4  : ()              | ('a',)                         idx(2) )
.... 2  : ()              | ('5', '5')                     idx(3) )
...... 5  : ()              | ('b',)                         idx(3) )
...... 5  : ()              | ('b',)                         idx(4) )
.. 5  : ()              | ('b',)                         idx(5) )
0  : ()              | ('4', '1', '5')                idx(0) ) False
4  : ()              | ('a',)                         idx(0) ) False
4  : ('a',)          | ()                             idx(0) Done) True
0  : ('4',)          | ('1', '5')                     idx(0) ) False
1  : ()              | ('3',