# Exercise 2: Bloom filter

In [146]:
# Implement a Bloom filter using bitarray
import bitarray as bit
from hashlib import sha3_256, sha256, blake2b

class BloomFilter:
    def __init__(self, size, hashes):
        self.size = size # Specify filter size
        self.hashes = hashes # Specify desired hash functions
        self.bitarray = bit.bitarray(size) # Initialize bitarray with defined size

    # Define hash functions, which return the bitarray index
    def my_hash(self, value):
        return int(sha256(value.lower().encode()).hexdigest(), 16) % self.size

    def my_hash2(self, value):
        return int(blake2b(value.lower().encode()).hexdigest(), 16) % self.size

    def my_hash3(self, value):
        return int(sha3_256(value.lower().encode()).hexdigest(), 16) % self.size 
    
    # Add a value to the Bloom filter
    # Note: Access hash functions as instance methods for flexibility to choose combinations
    def add(self, value):
        for hash in self.hashes: # Apply value through selected hash functions
            index = hash(self, value) # Get hashed value to bitarray index
            self.bitarray[index] = 1 # Set corresponding position in bitarray to 1 (occupied)

    # Check if a value is in the Bloom filter (may return false positives)
    def lookup(self, value):
        for hash in self.hashes:
            hashed = hash(self, value) # Access hash functions as instance methods
            index = hashed % self.size # Convert hashed value to index
            if self.bitarray[index] == 0: # Check if index position in bitarray is occupied
                return False
        return True

In [132]:
# Test the Bloom filter
add_test = BloomFilter(size = 100, hashes = [BloomFilter.my_hash, BloomFilter.my_hash2, BloomFilter.my_hash3])
add_test.add("Hello!")
print(add_test.lookup("Hello!"))   # True (potentially a false positive)
print(add_test.lookup("Goodbye!")) # False

True
False


In [138]:
# Store words.txt into Bloom filter (assuming all words are correctly spelled)
# Note: To avoid hash collisions, the absolute minimum for a corpus of 500,000 words with 3 hash functions is 150,000 bits.
# For a starting point, at least 1,000,000 (1e6) bits is recommended; 10,000,000 (1e7) bits to be safe.
words = BloomFilter(size = 10_000_000, hashes = [BloomFilter.my_hash, BloomFilter.my_hash2, BloomFilter.my_hash3])

with open("words.txt") as file:
    # Read word list one at a time
    for line in file:
        word = line.strip()
        words.add(word)

# Test the primed filter
print(words.lookup("cb&b634"))       # False
print(words.lookup("computational")) # True
print(words.lookup("methods"))       # True
print(words.lookup("for"))           # True
print(words.lookup("informatics"))   # True

False
True
True
True
True


In [134]:
# Use Bloom filter to create a spell checker
# Note: May return more false positives (suggesting a word not in the list) with smaller filters and fewer hash functions.
def spell_checker(word, BloomFilter):
    # If the word is in the specified Bloom filter, it may be correctly spelled
    if BloomFilter.lookup(word):
        return(f"'{word}' may be spelled correctly.")
    
    # If not, generate all possible single-letter substitutions and test them against Bloom filter
    word_suggestions = []
    for i in range(len(word)):
        for letter in "abcdefghijklmnopqrstuvwxyz":
            candidate = word[:i] + letter + word[i+1:] # Slice word to test every letter at every character position
            if BloomFilter.lookup(candidate):
                word_suggestions.append(candidate)

    # Return candidate words
    return word_suggestions

print(spell_checker("bruh", words))   # ['pruh', 'bluh', 'brum', 'brut']
print(spell_checker("moment", words)) # 'moment' may be spelled correctly.

['pruh', 'bluh', 'brum', 'brut']
'moment' may be spelled correctly.


In [64]:
# Test spell checker on typos.json
import json

def accuracy_checker(BloomFilter):
    # Load list of typos
    with open("typos.json") as file:
        typos = json.load(file) # Creates list of [typed_word, correct_word] pairs
    correct_count = 0
    
    # Check if correct word is produced by the specified Bloom filter and that it gives no more than 3 suggestions
    for typo in typos:
        if (typo[1] in spell_checker(typo[0], BloomFilter)) and (len(spell_checker(typo[0], BloomFilter)) <= 3):
            correct_count = correct_count + 1

    # Return spell checker accuracy
    return(correct_count / len(typos))

print("Suggestion accuracy:", round(accuracy_checker(words) * 100, 3), "%")


Suggestion accuracy: 45.8 %


In [144]:
test = BloomFilter(size = 10_000_000, hashes = [BloomFilter.my_hash, BloomFilter.my_hash2, BloomFilter.my_hash3])
with open("words.txt") as file:
    for line in file:
        word = line.strip()
        test.add(word)

In [145]:
print(spell_checker("bruh", test))   # ['pruh', 'bluh', 'brum', 'brut']
print(spell_checker("moment", test)) # 'moment' may be spelled correctly.

print(spell_checker("bruh", words))   # ['pruh', 'bluh', 'brum', 'brut']
print(spell_checker("moment", words)) # 'moment' may be spelled correctly.

['pruh', 'bluh', 'brum', 'brut']
'moment' may be spelled correctly.
['iruh', 'pruh', 'qruh', 'zruh', 'bcuh', 'bfuh', 'bluh', 'bouh', 'brgh', 'brnh', 'brwh', 'brxh', 'brul', 'brum', 'brut']
'moment' may be spelled correctly.


In [65]:
# Plot filter size against hash function choice (first, first two, all three)
import numpy as np
import pandas as pd

sizes = list(np.linspace(start = 1e5, stop = 1e7, num = 10, dtype = int)) # Generate evenly spaced Bloom filter sizes
filters = [BloomFilter(size = size, hashes = [BloomFilter.my_hash]),
           BloomFilter(size = size, hashes = [BloomFilter.my_hash, BloomFilter.my_hash2]),
           BloomFilter(size = size, hashes = [BloomFilter.my_hash, BloomFilter.my_hash2, BloomFilter.my_hash3])]

for size in sizes:
    accuracies = []
    for filter in filters:

accuracies = []
for size in sizes:
    one_hash_filter = BloomFilter(size = size, hashes = [BloomFilter.my_hash])
    accuracy = accuracy_checker(one_hash_filter)
    accuracies.append(accuracy)

one_hash_results = pd.DataFrame({"filter_size": sizes, "accuracy": accuracies})

accuracies = []
for size in sizes:
    two_hash_filter = BloomFilter(size = size, hashes = [BloomFilter.my_hash, BloomFilter.my_hash2])
    accuracy = accuracy_checker(two_hash_filter)
    accuracies.append(accuracy)

two_hash_results = pd.DataFrame({"filter_size": sizes, "accuracy": accuracies})

accuracies = []
for size in sizes:
    three_hash_filter = BloomFilter(size = size, hashes = [BloomFilter.my_hash, BloomFilter.my_hash2, BloomFilter.my_hash3])
    accuracy = accuracy_checker(three_hash_filter)
    accuracies.append(accuracy)

three_hash_results = pd.DataFrame({"filter_size": sizes, "accuracy": accuracies})

one_hash_results
two_hash_results
three_hash_results


KeyboardInterrupt: 