Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aho-Corasick multiple string matching algorithm #8

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Empty file.
122 changes: 122 additions & 0 deletions exact_multiple_string_matching/aho_corasick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from queue import Queue


def create_aho_corasick_automaton(keywords, alphabet='ab'):
return AhoCorasickAutomaton(keywords, alphabet)


def find_occurrences(text, n, ac_automaton):
return ac_automaton.find_occurrences(text, n)


# pylint: disable=too-few-public-methods
class AhoCorasickAutomaton:
def __init__(self, keywords, alphabet):
self._root = AhoCorasickAutomaton.Node()
self._construct_goto(keywords, alphabet)
self._construct_fail(alphabet)
self._construct_nxt(alphabet)

def _construct_goto(self, keywords, alphabet):
for k, k_len in keywords:
self._enter(k, k_len)

for a in alphabet:
if self._root.goto(a) is None:
self._root.update_goto(a, self._root)

def _enter(self, keyword, keyword_len):
current_state = self._root
j = 1

while j < keyword_len and current_state.goto(keyword[j]) is not None:
current_state = current_state.goto(keyword[j])
j += 1

for a in keyword[j:keyword_len + 1]:
next_state = AhoCorasickAutomaton.Node()
current_state.update_goto(a, next_state)
current_state = next_state

current_state.append_outputs([keyword_len])

def _construct_fail(self, alphabet):
q = Queue()
for s in (self._root.goto(a) for a in alphabet):
if s != self._root:
q.put(s)
s.update_fail(self._root)

while not q.empty():
current = q.get()
for a, child in ((a, current.goto(a)) for a in alphabet):
if child is not None:
q.put(child)

fallback = current.fail()
while fallback.goto(a) is None:
fallback = fallback.fail()

child_fallback = fallback.goto(a)
child.update_fail(child_fallback)
child.append_outputs(child_fallback.output())

def _construct_nxt(self, alphabet):
q = Queue()
for a in alphabet:
a_child = self._root.goto(a)
self._root.update_nxt(a, a_child)
if a_child != self._root:
q.put(a_child)
self._root.use_only_nxt()

while not q.empty():
current = q.get()
for a, child in ((a, current.goto(a)) for a in alphabet):
if child is not None:
q.put(child)
current.update_nxt(a, child)
else:
fallback = current.fail()
current.update_nxt(a, fallback.nxt(a))
current.use_only_nxt()

def find_occurrences(self, text, n):
state = self._root
for i in range(1, n + 1):
state = state.nxt(text[i])
for keyword_len in state.output():
start_pos = i - keyword_len + 1
yield text[start_pos:i + 1], start_pos

class Node:
def __init__(self):
self._goto, self._fail, self._output, self._nxt = {}, None, [], {}

def goto(self, a):
return self._goto.get(a, None)

def update_goto(self, a, target_node):
self._goto[a] = target_node

def output(self):
return self._output

def append_outputs(self, outputs):
self._output += outputs

def fail(self):
return self._fail

def update_fail(self, target_node):
self._fail = target_node

def nxt(self, a):
return self._nxt[a]

def update_nxt(self, a, target_node):
self._nxt[a] = target_node

def use_only_nxt(self):
del self._goto
del self._fail
65 changes: 65 additions & 0 deletions test/test_aho_corasick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import unittest
from random import randint

from exact_multiple_string_matching.aho_corasick import \
create_aho_corasick_automaton, find_occurrences
from exact_string_matching.forward import brute_force
from generator.rand import random_word


class TestAhoCorasick(unittest.TestCase):
def test_single_keyword(self):
text, n = '#aabccabcab', 10
automaton = self._create_from(['abc'])

found = list(find_occurrences(text, n, automaton))
self.assertListEqual(found, [('abc', 2), ('abc', 6)])

def test_non_overlapping(self):
text, n = '#aabccabcab', 10
automaton = self._create_from(['aa', 'bcc', 'bcab'])

found = list(find_occurrences(text, n, automaton))
self.assertListEqual(found, [('aa', 1), ('bcc', 3), ('bcab', 7)])

def test_overlapping(self):
text, n = '#eshers', 6
automaton = self._create_from(['he', 'she', 'his', 'her', 'hers'])

found = set(find_occurrences(text, n, automaton))
self.assertSetEqual(found, {('she', 2), ('he', 3), ('hers', 3), ('her', 3)})

def test_pessimistic(self):
text, n = '#' + 'a' * 20, 20
automaton = self._create_from([i * 'a' for i in range(1, 6)])

found = set(find_occurrences(text, n, automaton))
self.assertEqual(len(found), 20 + 19 + 18 + 17 + 16)

def test_no_match(self):
text, n = '#abababab', 8
automaton = self._create_from(['bb', 'abba'])

self.assertFalse(list(find_occurrences(text, n, automaton)))

def test_random(self):
n, m, A = 500, 30, ['a', 'b', 'c']
for _ in range(100):
t = random_word(n, A)
patterns = {random_word(randint(2, 10), A) for _ in range(m)}
automaton = self._create_from(patterns)

expected = set()
for p in patterns:
starts = brute_force(t, f'#{p}', n, len(p) + 1)
expected.union({(p, i) for i in starts})

found = set(find_occurrences(t, n, automaton))

self.assertSetEqual(expected, found)

@staticmethod
def _create_from(patterns):
keywords = ((f'#{p}', len(p)) for p in patterns)
alphabet = {a for p in patterns for a in p}
return create_aho_corasick_automaton(keywords, alphabet)
Binary file added text/PiotrMikolajczyk/Aho_Corasick_Automaton.pdf
Binary file not shown.