In [1]:
import pandas as pd
from lemma.parse import parse_dict
from lemma.utils import dump_lines

In [2]:
data, cols = parse_dict('data/dict.opcorpora.xml')
df = pd.DataFrame(data['lemmas'], columns=cols['lemmas'])

In [3]:
words = sorted(df['word'].to_list())
dump_lines(words, 'data/words.txt')

In [4]:
freq_dict = [f'{w} 1' for w in words]
dump_lines(freq_dict, 'data/freq_dict.txt')

In [5]:
# pip install symspellpy
from symspellpy import SymSpell
from symspellpy import Verbosity

ss = SymSpell()
# ss.create_dictionary('data/corpus.txt')
ss.load_dictionary('data/freq_dict.txt', 0, 1)

True

In [6]:
lookup_params = {
    'max_edit_distance': 2,
    'verbosity': Verbosity.CLOSEST
}

In [7]:
%timeit ss.lookup('надежный', **lookup_params)

33.8 µs ± 442 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


In [8]:
for s in ss.lookup('надежный', **lookup_params): print(s)

падежный, 1, 2
надёжный, 1, 2


In [None]:
for s in ss.lookup('рабогтает', **lookup_params): print(s)

In [9]:
for s in ss.lookup('админисраторамм', **lookup_params): print(s)

администраторам, 2, 1
администраторами, 2, 1


In [11]:
import bisect

class NFA(object):
    
    ANY = object()
    EPSILON = object()
    
    def __init__(self, start_state):
        self.transitions = {}
        self.final_states = set()
        self._start_state = start_state
    
    @property
    def start_state(self):
        return frozenset(self._expand(set([self._start_state])))
    
    def add_transition(self, src, input, dest):
        self.transitions.setdefault(src, {}).setdefault(input, set()).add(dest)

    def add_final_state(self, state):
        self.final_states.add(state)
    
    def is_final(self, states):
        return self.final_states.intersection(states)
    
    def _expand(self, states):
        frontier = set(states)
        while frontier:
            state = frontier.pop()
            new_states = self.transitions.get(state, {}).get(NFA.EPSILON, set()).difference(states)
            frontier.update(new_states)
            states.update(new_states)
        return states
    
    def next_state(self, states, input):
        dest_states = set()
        for state in states:
            state_transitions = self.transitions.get(state, {})
            dest_states.update(state_transitions.get(input, []))
            dest_states.update(state_transitions.get(NFA.ANY, []))
        return frozenset(self._expand(dest_states))
    
    def get_inputs(self, states):
        inputs = set()
        for state in states:
            inputs.update(self.transitions.get(state, {}).keys())
        return inputs
    
    def to_dfa(self):
        dfa = DFA(self.start_state)
        frontier = [self.start_state]
        seen = set()
        while frontier:
            current = frontier.pop()
            inputs = self.get_inputs(current)
            for input in inputs:
                if input == NFA.EPSILON: continue
                new_state = self.next_state(current, input)
                if new_state not in seen:
                    frontier.append(new_state)
                    seen.add(new_state)
                    if self.is_final(new_state):
                        dfa.add_final_state(new_state)
                if input == NFA.ANY:
                    dfa.set_default_transition(current, new_state)
                else:
                    dfa.add_transition(current, input, new_state)
        return dfa

class DFA(object):
    def __init__(self, start_state):
        self.start_state = start_state
        self.transitions = {}
        self.defaults = {}
        self.final_states = set()
    
    def add_transition(self, src, input, dest):
        self.transitions.setdefault(src, {})[input] = dest
    
    def set_default_transition(self, src, dest):
        self.defaults[src] = dest
    
    def add_final_state(self, state):
        self.final_states.add(state)

    def is_final(self, state):
        return state in self.final_states
    
    def next_state(self, src, input):
        state_transitions = self.transitions.get(src, {})
        return state_transitions.get(input, self.defaults.get(src, None))

    def next_valid_string(self, input):
        state = self.start_state
        stack = []
        
        # Evaluate the DFA as far as possible
        for i, x in enumerate(input):
            stack.append((input[:i], state, x))
            state = self.next_state(state, x)
            if not state: break
        else:
            stack.append((input[:i+1], state, None))

        if self.is_final(state):
            # Input word is already valid
            return input
        
        # Perform a 'wall following' search for the lexicographically smallest
        # accepting state.
        while stack:
            path, state, x = stack.pop()
            x = self.find_next_edge(state, x)
            if x:
                path += x
                state = self.next_state(state, x)
                if self.is_final(state):
                    return path
                stack.append((path, state, None))
        return None

    def find_next_edge(self, s, x):
        if x is None:
            x = u'\0'
        else:
            x = chr(ord(x) + 1)
        state_transitions = self.transitions.get(s, {})
        if x in state_transitions or s in self.defaults:
            return x
        labels = sorted(state_transitions.keys())
        pos = bisect.bisect_left(labels, x)
        if pos < len(labels):
            return labels[pos]
        return None

def levenshtein_automata(term, k):
    nfa = NFA((0, 0))
    for i, c in enumerate(term):
        for e in range(k + 1):
            # Correct character
            nfa.add_transition((i, e), c, (i + 1, e))
            if e < k:
                # Deletion
                nfa.add_transition((i, e), NFA.ANY, (i, e + 1))
                # Insertion
                nfa.add_transition((i, e), NFA.EPSILON, (i + 1, e + 1))
                # Substitution
                nfa.add_transition((i, e), NFA.ANY, (i + 1, e + 1))
    for e in range(k + 1):
        if e < k:
            nfa.add_transition((len(term), e), NFA.ANY, (len(term), e + 1))
        nfa.add_final_state((len(term), e))
    return nfa

def find_all_matches(word, k, lookup_func):
    """Uses lookup_func to find all words within levenshtein distance k of word.
    
    Args:
        word: The word to look up
        k: Maximum edit distance
        lookup_func: A single argument function that returns the first word in the
            database that is greater than or equal to the input argument.
    Yields:
        Every matching word within levenshtein distance k from the database.
    """
    lev = levenshtein_automata(word, k).to_dfa()
    match = lev.next_valid_string(u'\0')
    while match:
        next = lookup_func(match)
        if not next:
            return
        if match == next:
            yield match
            next = next + u'\0'
        match = lev.next_valid_string(next)

class Matcher(object):
    def __init__(self, l):
        self.l = l
        self.probes = 0
    def __call__(self, w):
        self.probes += 1
        pos = bisect.bisect_left(self.l, w)
        if pos < len(self.l):
            return self.l[pos]
        else:
            return None

In [12]:
from lemma.utils import load_lines
matcher = Matcher(load_lines('data/words.txt'))

In [13]:
list(find_all_matches('надежный', 1, matcher))

['надёжный', 'падежный']

In [14]:
list(find_all_matches('админисраторамм', 2, matcher))

['администраторам', 'администраторами']

In [15]:
%timeit list(find_all_matches('админисраторамм', 2, matcher))

106 ms ± 875 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [None]:
# apt install swig3.0
# pip install jamspell
# !wget https://github.com/bakwc/JamSpell-models/raw/master/ru.tar.gz
# !tar -xvf ru.tar.gz

In [None]:
import jamspell
jsp = jamspell.TSpellCorrector()
assert jsp.LoadLangModel('ru_small.bin')

In [None]:
jsp.FixFragment("надежныр механизм")