In [1]:
import math
import pandas as pd
import base64
from bitarray import bitarray
import mmh3
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [2]:
class Bloom_Filter():
    def __init__(self, items_count, FP_probability, size, hash_count):
        self.items_count = items_count
        self.FP_probability = FP_probability
        self.size = size
        self.hash_count = hash_count
        self.bit_array = bitarray(self.size)
        self.bit_array.setall(0)

    def add(self, item):
        try:
            List = [mmh3.hash(item, i) % self.size for i in range(self.hash_count)]
            for i in List:
                self.bit_array[i] = True
            return True
        except:
            return False

    def lookup(self, item):
        # print("[INFO]. Type : ", type(item))
        flag = False
        if type(item) == list:
            for it in item:
                for i in range(self.hash_count):
                    digest = mmh3.hash(it, i) % self.size
                    if self.bit_array[digest] == True:
                        flag = True
        return flag

In [3]:
AFINN = pd.read_csv("AFINN-111.txt", sep="\t", header=None).drop_duplicates()
AFINN = pd.concat([AFINN[AFINN[1] == -4], AFINN[AFINN[1] == -5]], axis=0).set_index(0).T.to_dict('list')

items_count = len(AFINN)
FP_probability = 0.001

size = int(-(items_count * math.log(FP_probability))/(math.log(2)**2))
hash_count = int((size / items_count) * math.log(2))

bloom_obj = Bloom_Filter(items_count, FP_probability, size, hash_count)
added_status = [bloom_obj.add(key) for key in AFINN.keys()]

with open("./bloom.txt", "wb") as bloom_file:
    bloom_file.write(base64.b64encode(bloom_obj.bit_array))

In [4]:
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 1)
ssc.checkpoint("./checkpoint")

headline = ssc.socketTextStream("localhost", 9999).window(10, 10).map(lambda line: line.split(' ')) #.map(lambda x: x.lower())
# headline.pprint()

filtered_bad_headline = headline.filter(bloom_obj.lookup).map(lambda list_of_words: " ".join(list_of_words))
filtered_bad_headline.pprint()

22/11/21 22:43:47 WARN Utils: Your hostname, samay resolves to a loopback address: 127.0.1.1; using 10.0.0.101 instead (on interface wlo1)
22/11/21 22:43:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/11/21 22:43:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
ssc.start()

[Stage 0:>                                                          (0 + 0) / 1][Stage 0:>                                                          (0 + 1) / 1]22/11/21 22:44:01 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/11/21 22:44:01 WARN BlockManager: Block input-0-1669088641600 replicated to only 0 peer(s) instead of 1 peers
[Stage 0:>                  (0 + 1) / 1][Stage 1:>                  (0 + 1) / 1]                                                                                

-------------------------------------------
Time: 2022-11-21 22:44:07
-------------------------------------------
A new guy running for mayor reveals his fuck ups accidentally. 
dr mitchell rosenthal phoenix house founder dies 87
elizabeth holmes is sentenced more than 11 years theranos fraud
elizabeth holmes be sentenced fraud trial
study finds hempfed cows had traces thc their milk
deforestation brings batborne virus home roost
nan goldin laura poitras two artists one devastating film
people with adhd face withdrawal adderall shortage continues
uk pledges 119 billion fight diseases poor countries
world population reaches 8 billion un says
...



[Stage 0:>                                                          (0 + 1) / 1]

In [6]:
ssc.stop()

-------------------------------------------
Time: 2022-11-21 22:44:17
-------------------------------------------



22/11/21 22:44:17 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:119)
	at org.apache.spar