# Assignment 1

1. Реализовать Блум фильтр с помощью битового массива.
Например, вы можете использовать [Битовые операции](https://wiki.python.org/moin/BitwiseOperators) или библиотеку bitarray.

2. Провести численный эксперимент при false postive rate = 0.05, и количестве объектов S = 1 000 000.
Убедится, полученные на семинаре оптимальные параметры фильтра позволяют достичь заявленного false positive rate.
Посчитать  $\frac {\epsilon - \hat \epsilon} \epsilon$, где $\hat \epsilon$ - ваша экспериментальная оценка false positive rate. В качестве объектов используйте строки.

Для начала импортируем библиотеку bitarray, и прочие вспомогательные библиотеки.

In [2]:
import bitarray
import math

Создадим битовый массив, приспособленный для небольшого количества объектов, для тестирования функционала. Выберем количество объектов, и посчитаем оптимальный размер и количество хэш функций для заданного false positive rate'а.

In [3]:
OBJECTS_COUNT = 10
FALSE_POSITIVE_RATE = 0.05


bitarray_length = int(( 1 / (math.pow(math.log(2),2)) ) * OBJECTS_COUNT * (math.log(1 / FALSE_POSITIVE_RATE)))
hash_count = int((bitarray_length / OBJECTS_COUNT) * math.log(2))

Посмотрим на результат.

In [4]:
f"Длина массива : {bitarray_length}, количество хэш функций: {hash_count}"  

'Длина массива : 62, количество хэш функций: 4'

Инициализируем массив

In [5]:
bit_array_test = bitarray.bitarray(bitarray_length)

Проверим.

In [6]:
bit_array_test

bitarray('00000000000110001011011111100101101001010000000100000000000000')

Обнулим массив, для дальнейшего тестирования.

In [7]:
bit_array_test.setall(False)

Посмотрим, что получилось

In [8]:
bit_array_test

bitarray('00000000000000000000000000000000000000000000000000000000000000')

Для хэширования используем murmurhash3 (библиотека mmh3).

In [9]:
import mmh3

Проверим сгенерированный хеш.

In [10]:
TEST_STRING = 'foo'

mmh3.hash(TEST_STRING)

-156908512

Мы видим, что результатом может являться любое 32-битное число, так что нам нужно спроецировать результаты хеширования на интервал, заданный размером фильтра. 

Для начала сгенерируем положительный хэш.

In [11]:
test_unsigned_hash = mmh3.hash(TEST_STRING, signed = False)

Далее спроецируем его на интервал (предполагая что распределение мурмурхэша примерно равномерное). Для этого напишем функцию проецирования.

In [12]:
def project_hash_to_buckets(hash_result, bucket_count):
    percentage = hash_result / float(2**32)
    for bucket in range (0,  bucket_count):
        if bucket / float(bucket_count) <= percentage and (bucket+1) / float(bucket_count) > percentage:
            return bucket
    return bucket_count

Проверим.

In [13]:
project_hash_to_buckets(test_unsigned_hash, bitarray_length)

59

Затем напишем функцию для генерации нескольких результатов хэш функций. Для простоты не будем сохранять сид функции, а просто сделаем его из range'а. Так что фактически сид будет каждый раз генерироваться заново, но зато не придётся выделять память под ещё одну структуру для хранения сидов. Возможно умнее было бы иметь рандомизированый сид, но мне как-то не хочется.

In [14]:
def get_multiple_hashes(hash_count, string_to_hash):
    hashes = []
    for seed in range(hash_count):
        hashes.append(mmh3.hash(string_to_hash, seed, signed=False))
    return hashes

Проверим.

In [15]:
get_multiple_hashes(hash_count, TEST_STRING)

[4138058784, 884891506, 546078562, 2419143978]

Теперь напишем функцию, которая объединит предыдущие.

In [16]:
def get_multiple_hash_buckets(hash_count, string_to_hash, bucket_count):
    hashes = get_multiple_hashes(hash_count, string_to_hash)
    hash_buckets = [project_hash_to_buckets(hash_result, bucket_count)for hash_result in hashes]
    return hash_buckets

Проверим, правильно ли работает.

In [17]:
get_multiple_hash_buckets(hash_count, TEST_STRING, bitarray_length)

[59, 12, 7, 34]

Теперь нужно начать реализовывать собственно операцию добавления в блум фильтр. Думаю, что умнее было бы реализовывать их через побитовые операции, но как-то не захотелось -- я не гонюсь за скоростью особо.

In [18]:
def add(bit_array, string):
    hash_buckets = get_multiple_hash_buckets(hash_count, string, bitarray_length)
    for hash_bucket in hash_buckets:
        bit_array[hash_bucket] = True
    return bit_array

Проверим

In [19]:
add(bit_array_test, TEST_STRING)

bitarray('00000001000010000000000000000000001000000000000000000000000100')

Ну и следовательно в таком же стиле реализуем операцию query.

In [20]:
def query(bit_array, string):
    hash_buckets = get_multiple_hash_buckets(hash_count, string, bitarray_length)
    for hash_bucket in hash_buckets:
        if not bit_array[hash_bucket]:
            return False
    return True

Проверим!

In [21]:
query(bit_array_test, TEST_STRING)

True

In [22]:
query(bit_array_test, TEST_STRING+'a')

False

Отлично. Теперь можно начать всю конструкцию запаковывать в блум фильтр.

In [23]:
class BloomFilter:
    
    def __init__(self, objects_count, false_positive_rate):
        self.objects_count = objects_count
        self.false_positive_rate = false_positive_rate
        
        self.bitarray_length = int(( 1 / (math.pow(math.log(2),2)) ) * self.objects_count * (math.log(1 / self.false_positive_rate)))
        self.hash_count = int((self.bitarray_length / self.objects_count) * math.log(2))
        
        self.bitarray_data = bitarray.bitarray(self.bitarray_length)
        self.bitarray_data.setall(False)
    
    def _project_hash_to_buckets(self, hash_result):
        percentage = hash_result / float(2**32)
        for bucket in range (0,  self.bitarray_length):
            if bucket / float(self.bitarray_length) <= percentage and (bucket+1) / float(self.bitarray_length) > percentage:
                return bucket
        return self.bitarray_length
    
    def _get_multiple_hashes(self, string_to_hash):
        hashes = []
        for seed in range(self.hash_count):
            hashes.append(mmh3.hash(string_to_hash, seed, signed=False))
        return hashes
    
    def _get_multiple_hash_buckets(self, string_to_hash):
        hashes = self._get_multiple_hashes(string_to_hash)
        hash_buckets = [self._project_hash_to_buckets(hash_result)for hash_result in hashes]
        return hash_buckets
    
    def insert(self, string):
        hash_buckets = self._get_multiple_hash_buckets(string)
        for hash_bucket in hash_buckets:
            self.bitarray_data[hash_bucket] = True

    def lookup(self, string):
        hash_buckets = self._get_multiple_hash_buckets(string)
        for hash_bucket in hash_buckets:
            if not self.bitarray_data[hash_bucket]:
                return False
        return True
    

Протестируем функционал.

In [24]:
test_bloom = BloomFilter(OBJECTS_COUNT, FALSE_POSITIVE_RATE)
test_bloom.insert(TEST_STRING)
test_bloom.lookup(TEST_STRING), test_bloom.bitarray_data

(True,
 bitarray('00000001000010000000000000000000001000000000000000000000000100'))

 Для генерации рандомных строк импортируем библиотеку uuid.

In [45]:
import uuid

Теперь создадим функцию, генерирующую рандомные объекты, для тестирования.

In [46]:
def generate_random_sequences(count):
    sequences = []
    for i in range(count):
        sequences.append(str(uuid.uuid4())[:8])
    return sequences

Протестируем же её.

In [47]:
generate_random_sequences(10)

['24daa4fd',
 'd98e9737',
 'd0f803a9',
 '54314639',
 '1683b0f4',
 '52da1c80',
 '49778c7b',
 'c2e8ca3c',
 '7fd88c9d',
 'e49a5420']

Отлично, теперь задизайним тест false positive rate'а. Сгенерируем два набора уникальных строк.

In [48]:
COUNT = 1000

random_train = generate_random_sequences(COUNT)
random_test = generate_random_sequences(COUNT)

Инциализируем фильтр, и наполним его первым набором.

In [49]:
bloom_filter = BloomFilter(COUNT, FALSE_POSITIVE_RATE)

for sequence in random_train:
    bloom_filter.insert(sequence)

Проверим, сколько строк из второго набора найдётся в нашем фильтре.

In [50]:
hits = []

for sequence in random_test:
    hits.append(bloom_filter.lookup(sequence))

sum(hits)

59

К сожалению, оказалось что всё работает крайне медленно -- видимо придётся переделывать на побитовые операции.

Напомню, как выглядит моя функция для заполнения.

In [51]:
def add(bit_array, string):
    hash_buckets = get_multiple_hash_buckets(hash_count, string, bitarray_length)
    for hash_bucket in hash_buckets:
        bit_array[hash_bucket] = True
    return bit_array

Хотя сначала проверю, может это хэш виноват?

In [52]:
COUNT = 1000000

random_sequences = generate_random_sequences(COUNT)

In [53]:
%%time

for sequence in random_sequences:
    mmh3.hash(sequence, 42, signed=False)

Wall time: 345 ms


Тогда проверю функцию add

In [54]:
OBJECTS_COUNT = 2000
FALSE_POSITIVE_RATE = 0.05


bitarray_length = int(( 1 / (math.pow(math.log(2),2)) ) * OBJECTS_COUNT * (math.log(1 / FALSE_POSITIVE_RATE)))
hash_count = int((bitarray_length / OBJECTS_COUNT) * math.log(2))

In [55]:
test_bit_array = bitarray.bitarray(bitarray_length)
test_bit_array.setall(False)


random_sequences = generate_random_sequences(OBJECTS_COUNT)

In [56]:
%%time

for sequence in random_sequences:
    add(test_bit_array, sequence)

Wall time: 16.3 s


Можно увидеть, что уже для 2000 объектов время -- 16 секунд!

Поэтому очевидно надо менять алгоритм функции add.

Экспериментально было выяснено, что проблема в проекции хэша на интервал -- я решил использовать нампи для быстрого решения задачи поиска нужного бакета.

Напомню, как выглядит функция проекции хэша на интервал.

In [57]:
def project_hash_to_buckets(hash_result, bucket_count):
    percentage = hash_result / float(2**32)
    for bucket in range (0,  bucket_count):
        if bucket / float(bucket_count) <= percentage and (bucket+1) / float(bucket_count) > percentage:
            return bucket
    return bucket_count

In [58]:
project_hash_to_buckets(test_unsigned_hash, bitarray_length)

12014

Импортирую библиотеку нампи.

In [59]:
import numpy as np

Создаю новый векторизованный алгоритм.

In [60]:
def project_hash_to_buckets_vectorized(hash_result, bucket_count):
    percentage = hash_result / float(2**32)
    buckets = np.arange(bucket_count)
    buckets_percentages = buckets/bucket_count
    closest_bucket_idx = (np.abs(buckets_percentages / percentage - 1)).argmin()
    return buckets[closest_bucket_idx]

Проверим его.

In [61]:
project_hash_to_buckets_vectorized(test_unsigned_hash, bitarray_length)

12014

Теперь проверим намного ли он быстрее

In [68]:
OBJECTS_COUNT = 1000
FALSE_POSITIVE_RATE = 0.05


bitarray_length = int(( 1 / (math.pow(math.log(2),2)) ) * OBJECTS_COUNT * (math.log(1 / FALSE_POSITIVE_RATE)))
hash_count = int((bitarray_length / OBJECTS_COUNT) * math.log(2))

random_sequences = generate_random_sequences(OBJECTS_COUNT)

hashes = []
for sequence in random_sequences:
    hashes.append(mmh3.hash(sequence, 42, signed=False))
    
    

In [69]:
%%time

for hash_string in hashes:
    project_hash_to_buckets(hash_string, bitarray_length)

Wall time: 1.01 s


Достаточно. Теперь заменим алгоритм в классе блум фильтра

In [70]:
class BloomFilter:
    
    def __init__(self, objects_count, false_positive_rate):
        self.objects_count = objects_count
        self.false_positive_rate = false_positive_rate
        
        self.bitarray_length = int(( 1 / (math.pow(math.log(2),2)) ) * self.objects_count * (math.log(1 / self.false_positive_rate)))
        self.hash_count = int((self.bitarray_length / self.objects_count) * math.log(2))
        
        self.bitarray_data = bitarray.bitarray(self.bitarray_length)
        self.bitarray_data.setall(False)
    
    def _project_hash_to_buckets(self, hash_result):
        percentage = hash_result / float(2**32)
        buckets = np.arange(self.bitarray_length)
        buckets_percentages = buckets/self.bitarray_length
        closest_bucket_idx = (np.abs(buckets_percentages / percentage - 1)).argmin()
        return buckets[closest_bucket_idx]    
    
    def _get_multiple_hashes(self, string_to_hash):
        hashes = []
        for seed in range(self.hash_count):
            hashes.append(mmh3.hash(string_to_hash, seed, signed=False))
        return hashes
    
    def _get_multiple_hash_buckets(self, string_to_hash):
        hashes = self._get_multiple_hashes(string_to_hash)
        hash_buckets = [self._project_hash_to_buckets(hash_result)for hash_result in hashes]
        return hash_buckets
    
    def insert(self, string):
        hash_buckets = self._get_multiple_hash_buckets(string)
        for hash_bucket in hash_buckets:
            self.bitarray_data[hash_bucket] = True

    def lookup(self, string):
        hash_buckets = self._get_multiple_hash_buckets(string)
        for hash_bucket in hash_buckets:
            if not self.bitarray_data[hash_bucket]:
                return False
        return True

Протестируем его

In [71]:
OBJECTS_COUNT = 10

test_bloom = BloomFilter(OBJECTS_COUNT, FALSE_POSITIVE_RATE)
test_bloom.insert(TEST_STRING)
test_bloom.lookup(TEST_STRING), test_bloom.bitarray_data

(True,
 bitarray('00000000100001000000000000000000000100000000000000000000000010'))

In [72]:
TEST_COUNT = 1000000

random_train = generate_random_sequences(TEST_COUNT)
random_test = generate_random_sequences(TEST_COUNT)

In [None]:
%%time
bloom_filter = BloomFilter(TEST_COUNT, FALSE_POSITIVE_RATE)

for sequence in random_train:
    bloom_filter.insert(sequence)

In [None]:
hits = []

for sequence in random_test:
    hits.append(bloom_filter.lookup(sequence))

sum(hits)