<a href="https://colab.research.google.com/github/newmantic/Aho_Corasick/blob/main/Aho_Corasick.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from collections import deque, defaultdict

class AhoCorasick:
    def __init__(self, keywords):
        """
        Initialize the Aho-Corasick algorithm with a list of keywords (patterns).
        :param keywords: List of patterns to search for in the text.
        """
        self.num_keywords = len(keywords)
        self.trie = {}
        self.output = defaultdict(list)
        self.fail = {}
        self.build_trie(keywords)
        self.build_failure_function()

    def build_trie(self, keywords):
        """
        Build the Trie from the list of keywords.
        :param keywords: List of patterns to add to the trie.
        """
        self.trie = {}
        self.output = defaultdict(list)
        new_node = 0

        for keyword in keywords:
            node = 0
            for char in keyword:
                if (node, char) not in self.trie:
                    new_node += 1
                    self.trie[(node, char)] = new_node
                node = self.trie[(node, char)]
            self.output[node].append(keyword)

    def build_failure_function(self):
        """
        Build the failure function using BFS.
        """
        self.fail = {}
        queue = deque()

        # Initialize the first level of the Trie
        for char in {char for (_, char) in self.trie if _ == 0}:
            node = self.trie[(0, char)]
            self.fail[node] = 0
            queue.append(node)

        # BFS to build the failure function
        while queue:
            r = queue.popleft()

            for (u, char), v in self.trie.items():
                if u == r:
                    queue.append(v)
                    state = self.fail[u]

                    while (state, char) not in self.trie and state != 0:
                        state = self.fail[state]

                    self.fail[v] = self.trie.get((state, char), 0)
                    self.output[v].extend(self.output[self.fail[v]])

    def search(self, text):
        """
        Search the text for all patterns using the Aho-Corasick algorithm.
        :param text: The text in which to search for the patterns.
        :return: A dictionary where keys are the found patterns and values are lists of starting indices.
        """
        node = 0
        results = defaultdict(list)

        for i in range(len(text)):
            while (node, text[i]) not in self.trie and node != 0:
                node = self.fail[node]

            if (node, text[i]) in self.trie:
                node = self.trie[(node, text[i])]

            if self.output[node]:
                for pattern in self.output[node]:
                    results[pattern].append(i - len(pattern) + 1)

        return dict(results)



In [2]:
def test_case_1():
    text = "abccababcabca"
    patterns = ["abc", "ab", "ca"]
    ac = AhoCorasick(patterns)
    result = ac.search(text)
    print(f"Matches found: {result}")
    # Expected:
    # {
    #   "abc": [0, 4, 7],
    #   "ab": [0, 4, 7, 9],
    #   "ca": [2, 10]
    # }

test_case_1()

Matches found: {'ab': [0, 4, 6, 9], 'abc': [0, 6, 9], 'ca': [3, 8, 11]}


In [3]:
def test_case_2():
    text = "aaaaaa"
    patterns = ["aa", "aaa"]
    ac = AhoCorasick(patterns)
    result = ac.search(text)
    print(f"Matches found: {result}")
    # Expected:
    # {
    #   "aa": [0, 1, 2, 3, 4],
    #   "aaa": [0, 1, 2, 3]
    # }

test_case_2()

Matches found: {'aa': [0, 1, 2, 3, 4], 'aaa': [0, 1, 2, 3]}


In [4]:
def test_case_3():
    text = "abcdefgh"
    patterns = ["xyz", "123"]
    ac = AhoCorasick(patterns)
    result = ac.search(text)
    print(f"Matches found: {result}")
    # Expected: {}

test_case_3()

Matches found: {}


In [5]:
def test_case_4():
    text = "thisisatesttext"
    patterns = ["this", "test", "text"]
    ac = AhoCorasick(patterns)
    result = ac.search(text)
    print(f"Matches found: {result}")
    # Expected:
    # {
    #   "this": [0],
    #   "test": [6],
    #   "text": [10]
    # }

test_case_4()

Matches found: {'this': [0], 'test': [7], 'text': [11]}


In [6]:
def test_case_5():
    text = "ab" * 1000 + "abc"
    patterns = ["ab", "abc"]
    ac = AhoCorasick(patterns)
    result = ac.search(text)
    print(f"Matches found: {result}")
    # Expected:
    # {
    #   "ab": [0, 2, 4, ..., 1998, 2000],
    #   "abc": [2000]
    # }

test_case_5()

Matches found: {'ab': [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 322, 324, 326, 328, 330, 332, 334, 336, 338, 340, 342, 344, 346, 348, 350, 352, 354, 356, 358, 360, 362, 364, 366, 368, 370, 372, 374, 376, 378, 380, 382, 384, 386, 388, 390, 392, 394, 396, 398, 400, 402, 404, 406, 408, 410, 41