In [1]:
import os
import psutil
import pyarrow.parquet as pq
import pyarrow as pa
import ray
from ray import remote
import hashlib

In [2]:
ray.shutdown()
#ray.init(address='ray://localhost:10001')
ray.init()

2023-02-07 17:08:15,848	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.9
Ray version:,2.2.0
Dashboard:,http://127.0.0.1:8265


In [3]:
@remote
class BloomSet():
    def __init__(self, hash_truncation):
        self.db = {}
        self.hash_truncation = hash_truncation

    def sha256(self, s): return hashlib.sha256(s.encode()).hexdigest()

    def add_to_set(self, s : str):
        hash = self.sha256(s)
        key = hash[:self.hash_truncation]
        if key not in self.db.keys():
            self.db[key]=[hash]
            return False
        else:
            if hash in self.db[key]:
                return True
            else:
                self.db[key].append(hash)
                return False

    def save(self, file_name):
        m1 = self.get_memory_footprint()
        pt = pa.Table.from_pydict(self.db)
        m2 = self.get_memory_footprint()
        pa.parquet.write_table(pt, f'{file_name}')
        m3 = self.get_memory_footprint()
        return (m1, m2, m3)

    def save1(self):
        m1 = self.get_memory_footprint()
        self.pt = pa.Table.from_pydict(self.db)
        m2 = self.get_memory_footprint()
        return (m1, m2)

    def save2(self, file_name):
        pa.parquet.write_table(self.pt, f'{file_name}')
        m3 = self.get_memory_footprint()
        return m3

    # in megabyte
    def get_memory_footprint(self):
        process = psutil.Process(os.getpid())
        return process.memory_info().rss/1024/1024

In [4]:
@remote
class BloomSet():
    def __init__(self, hash_truncation):
        self.db_index = []
        self.db = []
        self.hash_truncation = hash_truncation

    def sha256(self, s): return hashlib.sha256(s.encode()).hexdigest()

    def add_to_set(self, s : str):
        hash = self.sha256(s)
        key = hash[:self.hash_truncation]
        if key not in self.db_index:
            self.db_index.append(key)
            self.db.append([hash])
            return False
        else:
            if hash in self.db[self.db_index.index(key)]:
                return True
            else:
                self.db[self.db_index.index(key)].append(hash)
                return False

    def save(self, file_name):
        names = ['index','hash_list']
        
        m1 = self.get_memory_footprint()
        table = pa.Table.from_arrays([pa.array(self.db_index), pa.array(self.db)], names=names)
        
        m2 = self.get_memory_footprint()
        pa.parquet.write_table(table, f'{file_name}')
        
        m3 = self.get_memory_footprint()
        return (m1, m2, m3)




    # in megabyte
    def get_memory_footprint(self):
        process = psutil.Process(os.getpid())
        return process.memory_info().rss/1024/1024

In [5]:
bs = BloomSet.remote(4)


In [6]:
ray.get(bs.add_to_set.remote('test1'))

False

In [7]:
ray.get(bs.add_to_set.remote('test2'))

False

In [8]:
ray.get(bs.add_to_set.remote('test1'))

True

Test memory consumption, result:
Insert 10k keys into the BloomFilter, memory went up from 115.18MB to 117.93MB, so approx. 288 byte per entry 

In [9]:
for i in range(100000):
    bs.add_to_set.remote(f'test{i}')
    if i % 10000 == 0:
        print(ray.get(bs.get_memory_footprint.remote()))

111.1953125




238.33203125
267.0390625
278.69140625
285.15625
282.3125
292.46875
297.51953125
298.6484375
301.55859375


Inserted 10k keys into the BloomFilter and went up from 115.18MB to 117.93MB, so approx. 288 byte per entry 

In [10]:
ray.get(bs.save.remote('/tmp/db.parquet'))

(297.5546875, 312.5625, 319.60546875)