-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Aho-Corasick multiple string matching algorithm (#8)
* Add implementation of Aho-Corasick Automaton * Add tests * Add tex+pdf
- Loading branch information
1 parent
faa7c7b
commit 412cda4
Showing
6 changed files
with
437 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
from queue import Queue | ||
|
||
|
||
def create_aho_corasick_automaton(keywords, alphabet='ab'): | ||
return AhoCorasickAutomaton(keywords, alphabet) | ||
|
||
|
||
def find_occurrences(text, n, ac_automaton): | ||
return ac_automaton.find_occurrences(text, n) | ||
|
||
|
||
# pylint: disable=too-few-public-methods | ||
class AhoCorasickAutomaton: | ||
def __init__(self, keywords, alphabet): | ||
self._root = AhoCorasickAutomaton.Node() | ||
self._construct_goto(keywords, alphabet) | ||
self._construct_fail(alphabet) | ||
self._construct_nxt(alphabet) | ||
|
||
def _construct_goto(self, keywords, alphabet): | ||
for k, k_len in keywords: | ||
self._enter(k, k_len) | ||
|
||
for a in alphabet: | ||
if self._root.goto(a) is None: | ||
self._root.update_goto(a, self._root) | ||
|
||
def _enter(self, keyword, keyword_len): | ||
current_state = self._root | ||
j = 1 | ||
|
||
while j < keyword_len and current_state.goto(keyword[j]) is not None: | ||
current_state = current_state.goto(keyword[j]) | ||
j += 1 | ||
|
||
for a in keyword[j:keyword_len + 1]: | ||
next_state = AhoCorasickAutomaton.Node() | ||
current_state.update_goto(a, next_state) | ||
current_state = next_state | ||
|
||
current_state.append_outputs([keyword_len]) | ||
|
||
def _construct_fail(self, alphabet): | ||
q = Queue() | ||
for s in (self._root.goto(a) for a in alphabet): | ||
if s != self._root: | ||
q.put(s) | ||
s.update_fail(self._root) | ||
|
||
while not q.empty(): | ||
current = q.get() | ||
for a, child in ((a, current.goto(a)) for a in alphabet): | ||
if child is not None: | ||
q.put(child) | ||
|
||
fallback = current.fail() | ||
while fallback.goto(a) is None: | ||
fallback = fallback.fail() | ||
|
||
child_fallback = fallback.goto(a) | ||
child.update_fail(child_fallback) | ||
child.append_outputs(child_fallback.output()) | ||
|
||
def _construct_nxt(self, alphabet): | ||
q = Queue() | ||
for a in alphabet: | ||
a_child = self._root.goto(a) | ||
self._root.update_nxt(a, a_child) | ||
if a_child != self._root: | ||
q.put(a_child) | ||
self._root.use_only_nxt() | ||
|
||
while not q.empty(): | ||
current = q.get() | ||
for a, child in ((a, current.goto(a)) for a in alphabet): | ||
if child is not None: | ||
q.put(child) | ||
current.update_nxt(a, child) | ||
else: | ||
fallback = current.fail() | ||
current.update_nxt(a, fallback.nxt(a)) | ||
current.use_only_nxt() | ||
|
||
def find_occurrences(self, text, n): | ||
state = self._root | ||
for i in range(1, n + 1): | ||
state = state.nxt(text[i]) | ||
for keyword_len in state.output(): | ||
start_pos = i - keyword_len + 1 | ||
yield text[start_pos:i + 1], start_pos | ||
|
||
class Node: | ||
def __init__(self): | ||
self._goto, self._fail, self._output, self._nxt = {}, None, [], {} | ||
|
||
def goto(self, a): | ||
return self._goto.get(a, None) | ||
|
||
def update_goto(self, a, target_node): | ||
self._goto[a] = target_node | ||
|
||
def output(self): | ||
return self._output | ||
|
||
def append_outputs(self, outputs): | ||
self._output += outputs | ||
|
||
def fail(self): | ||
return self._fail | ||
|
||
def update_fail(self, target_node): | ||
self._fail = target_node | ||
|
||
def nxt(self, a): | ||
return self._nxt[a] | ||
|
||
def update_nxt(self, a, target_node): | ||
self._nxt[a] = target_node | ||
|
||
def use_only_nxt(self): | ||
del self._goto | ||
del self._fail |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import unittest | ||
from random import randint | ||
|
||
from exact_multiple_string_matching.aho_corasick import \ | ||
create_aho_corasick_automaton, find_occurrences | ||
from exact_string_matching.forward import brute_force | ||
from generator.rand import random_word | ||
|
||
|
||
class TestAhoCorasick(unittest.TestCase): | ||
def test_single_keyword(self): | ||
text, n = '#aabccabcab', 10 | ||
automaton = self._create_from(['abc']) | ||
|
||
found = list(find_occurrences(text, n, automaton)) | ||
self.assertListEqual(found, [('abc', 2), ('abc', 6)]) | ||
|
||
def test_non_overlapping(self): | ||
text, n = '#aabccabcab', 10 | ||
automaton = self._create_from(['aa', 'bcc', 'bcab']) | ||
|
||
found = list(find_occurrences(text, n, automaton)) | ||
self.assertListEqual(found, [('aa', 1), ('bcc', 3), ('bcab', 7)]) | ||
|
||
def test_overlapping(self): | ||
text, n = '#eshers', 6 | ||
automaton = self._create_from(['he', 'she', 'his', 'her', 'hers']) | ||
|
||
found = set(find_occurrences(text, n, automaton)) | ||
self.assertSetEqual(found, {('she', 2), ('he', 3), ('hers', 3), ('her', 3)}) | ||
|
||
def test_pessimistic(self): | ||
text, n = '#' + 'a' * 20, 20 | ||
automaton = self._create_from([i * 'a' for i in range(1, 6)]) | ||
|
||
found = set(find_occurrences(text, n, automaton)) | ||
self.assertEqual(len(found), 20 + 19 + 18 + 17 + 16) | ||
|
||
def test_no_match(self): | ||
text, n = '#abababab', 8 | ||
automaton = self._create_from(['bb', 'abba']) | ||
|
||
self.assertFalse(list(find_occurrences(text, n, automaton))) | ||
|
||
def test_random(self): | ||
n, m, A = 500, 30, ['a', 'b', 'c'] | ||
for _ in range(100): | ||
t = random_word(n, A) | ||
patterns = {random_word(randint(2, 10), A) for _ in range(m)} | ||
automaton = self._create_from(patterns) | ||
|
||
expected = set() | ||
for p in patterns: | ||
starts = brute_force(t, f'#{p}', n, len(p) + 1) | ||
expected.union({(p, i) for i in starts}) | ||
|
||
found = set(find_occurrences(t, n, automaton)) | ||
|
||
self.assertSetEqual(expected, found) | ||
|
||
@staticmethod | ||
def _create_from(patterns): | ||
keywords = ((f'#{p}', len(p)) for p in patterns) | ||
alphabet = {a for p in patterns for a in p} | ||
return create_aho_corasick_automaton(keywords, alphabet) |
Binary file not shown.
Oops, something went wrong.