# Project MMDB
Authors:
- Nazarii Drushchak
- Igor Babin
- Uliana Zbezhkhovska

- Consider all the changes done in the wikipedia as stream.
    - Check here: https://wikitech.wikimedia.org/wiki/RCStream
- Each action is received in json format.
- Data is full of bots. There is a flag were programmers can indicate that an actions has been done by a bot.
- Using this information as ground truth, develop a system able to classify users as bot or human.
- Constrain: You need to sample, and just use the 20% of the data stream.
- Describe the distribution of edits per users and bots.

In [1]:
import json
from sseclient import SSEClient as EventSource

In [2]:
url = 'https://stream.wikimedia.org/v2/stream/recentchange'

In [3]:
counter = 0
maxEvents = 100  # print n events and stop

bot = 0
non_bot = 0

for event in EventSource(url):
    if event.event == 'message':
        try:
            change = json.loads(event.data)
        except ValueError:
            continue
      
        if counter % 5 == 0:

            if change['bot'] == False:
                non_bot += 1
                print("User '{user}' isn't a bot".format(**change))
            else:
                bot += 1
                print("User '{user}' is a bot".format(**change))

        counter += 1

        if counter > maxEvents:
            break

print('\n\n')
print(f"Total Bot Edits: {bot}")
print(f"Total Non-Bot Edits: {non_bot}")


User 'Gzen92Bot' is a bot
User 'Aidas' isn't a bot
User 'Lutheraner' isn't a bot
User 'Luca.favorido' isn't a bot
User 'Manuele9866' isn't a bot
User 'AgainErick' isn't a bot
User 'Marchjuly' isn't a bot
User '円周率３パーセント' isn't a bot
User '213.55.226.65' isn't a bot
User 'Higa4' isn't a bot
User '213.55.226.65' isn't a bot
User '2001:B011:8007:10C4:5131:1454:AE5D:B376' isn't a bot
User 'Bot-Jagwar' is a bot
User '190.21.247.59' isn't a bot
User 'AgainErick' isn't a bot
User '95.237.7.141' isn't a bot
User 'GeographBot' is a bot
User 'KrBot' is a bot
User 'Fuzheado' isn't a bot
User 'Escargot mécanique' is a bot
User 'AgainErick' isn't a bot
Total Bot Edits: 5
Total Non-Bot Edits: 16


## Train the Bloom filter

Finally, train a Bloom Filter that filter out bots from the stream.
- Find the correct parameters for the bloom filter having an error below 10%.

In [34]:
from pybloom_live import BloomFilter

maxEvents = 100  # print n events and stop

best_false_positive_rate_actual = 0.1
best_capacity = 0
best_false_positive_rate = 0

# Iterate through different Bloom Filter parameters
for candidate_capacity in [100, 1000, 10000]:
    for candidate_false_positive_rate in [0.001, 0.01, 0.1]:
        # Initialize the Bloom Filter with candidate parameters
        bloom_filter = BloomFilter(candidate_capacity, candidate_false_positive_rate)

        # Initialize variables to track users who are bots and non-bots
        changes = []
        counter = 0
       
        for event in EventSource(url):
            if event.event == 'message':
                try:
                    change = json.loads(event.data)
                except ValueError:
                    continue

                if counter % 5 == 0:
                    # Train the Bloom Filter with bot users
                    if change['bot'] == True:
                        bloom_filter.add(change['user'])
                changes.append(change)  # Accumulate changes for future analysis of users
                counter += 1

                if counter > maxEvents:
                    break

        # Calculate the actual False Positive Rate 
        false_positive_rate_actual = len([user for user in changes if bloom_filter.__contains__(user['user'])]) / len(changes)
        if false_positive_rate_actual < best_false_positive_rate_actual:
            best_false_positive_rate_actual = false_positive_rate_actual
            best_capacity = candidate_capacity
            best_false_positive_rate = candidate_false_positive_rate
        changes = []
        counter = 0
        
#Print the best parameters and their actual False Positive Rate
print(f"Best Bloom Filter Parameters: Estimated Capacity={best_capacity}, Desired Error Rate={best_false_positive_rate}")
print(f"Actual False Positive Rate for Bloom filter with best parameters: {best_false_positive_rate_actual:.2%}")


Best Bloom Filter Parameters: Estimated Capacity=1000, Desired Error Rate=0.001
Actual False Positive Rate for Bloom filter with best parameters: 6.93%


## Spark Streaming
If you want to have a 100% you need to do this:
- Make your system to work with Spark Streaming (5%)

In [38]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext



In [50]:
# Create a SparkContext
sc = SparkContext('local[*]')

# Create a StreamingContext with a batch interval of 10 second
ssc = StreamingContext(sc, 10)

In [51]:

# Function to process each batch of events
def process_batch(rdd):
    # Initialize variables to track the best parameters
    best_capacity = 0
    best_false_positive_rate = 0
    best_false_positive_rate_actual = 0.1  
    maxEvents = 50
    # Iterate through different Bloom Filter parameters
    for candidate_capacity in [100, 1000, 10000]:
        for candidate_false_positive_rate in [0.001, 0.01, 0.1]:
            # Initialize the Bloom Filter with candidate parameters
            bloom_filter = BloomFilter(candidate_capacity, candidate_false_positive_rate)

            # Initialize variables to track users who are bots and non-bots
            changes = []
            counter = 0

            for event in EventSource(url):
                if event.event == 'message':
                    try:
                        change = json.loads(event.data)
                    except ValueError:
                        continue

                    if counter % 5 == 0:
                        # Train the Bloom Filter with bot users
                        if change['bot'] == True:
                            bloom_filter.add(change['user'])
                    changes.append(change)  # Accumulate changes for future analysis of users
                    counter += 1

                    if counter > maxEvents:
                        break

            # Calculate the actual False Positive Rate 
            false_positive_rate_actual = len([user for user in changes if bloom_filter.__contains__(user['user'])]) / len(changes)
            if false_positive_rate_actual < best_false_positive_rate_actual:
                best_false_positive_rate_actual = false_positive_rate_actual
                best_capacity = candidate_capacity
                best_false_positive_rate = candidate_false_positive_rate
            changes = []
            counter = 0
            
    #Print the best parameters and their actual False Positive Rate
    print(f"Best Bloom Filter Parameters: Estimated Capacity={best_capacity}, Desired Error Rate={best_false_positive_rate}")
    print(f"Actual False Positive Rate for Bloom filter with best parameters: {best_false_positive_rate_actual:.2%}")


# Create a DStream from the EventSource
dstream = ssc.socketTextStream('localhost', 9999)

# Process each batch of events
dstream.foreachRDD(process_batch)

# Start the Spark Streaming context
ssc.start()
ssc.awaitTermination()


Best Bloom Filter Parameters: Estimated Capacity=1000, Desired Error Rate=0.1
Actual False Positive Rate for Bloom filter with best parameters: 1.96%
Best Bloom Filter Parameters: Estimated Capacity=100, Desired Error Rate=0.001
Actual False Positive Rate for Bloom filter with best parameters: 5.88%


Py4JJavaError: An error occurred while calling o16455.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\anaconda3\lib\site-packages\pyspark\streaming\util.py", line 71, in call
    r = self.func(t, *rdds)
  File "C:\anaconda3\lib\site-packages\pyspark\streaming\dstream.py", line 236, in func
    return old_func(rdd)  # type: ignore[call-arg, arg-type]
  File "C:\Users\Уляна\AppData\Local\Temp\ipykernel_23136\3241030011.py", line 18, in process_batch
    for event in EventSource(url):
  File "C:\anaconda3\lib\site-packages\sseclient.py", line 48, in __init__
    self._connect()
  File "C:\anaconda3\lib\site-packages\sseclient.py", line 63, in _connect
    self.resp.raise_for_status()
  File "C:\anaconda3\lib\site-packages\requests\models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 429 Client Error: Too Many Requests for url: https://stream.wikimedia.org/v2/stream/recentchange

	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1$adapted(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
