In [1]:
import numpy as np
import hashlib
from pyspark.sql.functions import col, from_json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from scipy.stats import laplace  # Import Laplace distribution for differential privacy
import os
from google.cloud import storage

# Define SimpleFilter class
class SimpleFilter:
    def __init__(self, size, hash_count):
        self.size = size
        self.hash_count = hash_count
        self.bit_array = np.zeros(size, dtype=np.bool_)

    def _hash(self, item, seed):
        hash_val = int(hashlib.md5(item.encode('utf-8')).hexdigest(), 16)
        return (hash_val + seed) % self.size

    def add(self, item):
        for seed in range(self.hash_count):
            index = self._hash(item, seed)
            self.bit_array[index] = True

    def check(self, item):
        for seed in range(self.hash_count):
            index = self._hash(item, seed)
            if not self.bit_array[index]:
                return False
        return True

# Initialize Spark session
spark = SparkSession \
    .builder \
    .appName("StreamingWithBloomFilter") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

# Define the schema of the message
messageSchema = StructType([
    StructField("symbol_id", StringType()),
    StructField("sequence", StringType()),
    StructField("time_exchange", TimestampType()),
    StructField("time_coinapi", TimestampType()),
    StructField("uuid", StringType()),
    StructField("price", DoubleType()),
    StructField("size", DoubleType()),
    StructField("taker_side", StringType())
])

# Subscribe to multiple topics
topic_names = "bitstamp_btc_usd,bitfinex_btc_usd,kraken_btc_usd"

# Create a streaming DataFrame that reads from the topics
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic_names) \
    .load()

# Deserialize the JSON data and apply the schema
value_df = df.select(from_json(col("value").cast("string"), messageSchema).alias("data")).select("data.*")

# Initialize the Bloom Filter
bloom_filter = SimpleFilter(10000, 3)

# Define the local folder path
local_folder_path = "/Users/priyasuresh/googlecloud"

# Define the Google Cloud Storage bucket name
bucket_name = "bigdataproject_cryptocurrency"

# Function to add Laplacian noise for differential privacy
def add_laplace_noise(data, epsilon=1.0, sensitivity=1):
    try:
        # Convert data to float and then add Laplace noise
        data_float = float(data)
        scale = sensitivity / epsilon
        noise = laplace.rvs(scale=scale, size=1)[0]
        return str(data_float + noise)  # Convert back to string after adding noise
    except ValueError:
        # If data cannot be converted to float (e.g., UUID), return the original value
        return data

# Define process_batch function with differential privacy, local saving, and GCS uploading
def process_batch(batch_df, batch_id):
    # Convert to Pandas DataFrame for easier manipulation
    pandas_df = batch_df.toPandas()
    
    # Apply differential privacy to each UUID
    pandas_df['uuid'] = pandas_df['uuid'].apply(add_laplace_noise)
    
    # Apply the Bloom Filter to each row
    for index, row in pandas_df.iterrows():
        uuid = row['uuid']
        if not bloom_filter.check(uuid):
            bloom_filter.add(uuid)  # Use add method to insert UUID into the Bloom filter
            # Process the row since it's not a duplicate
            print(row)
            # Save the row to a CSV file locally
            file_path = os.path.join(local_folder_path, f"batch_{batch_id}_{index}.csv")
            row.to_csv(file_path, index=False)
            print(f"Row saved to: {file_path}")
            # Upload the file to Google Cloud Storage
            upload_to_gcs(file_path)
            # Delete the local file after uploading
            os.remove(file_path)

# Function to upload file to Google Cloud Storage
def upload_to_gcs(file_path):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    destination_blob_name = os.path.basename(file_path)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(file_path)
    print(f"File uploaded to GCS: gs://{bucket_name}/{destination_blob_name}")

# Apply the function to each micro-batch
query = value_df.writeStream.foreachBatch(process_batch).start()

# Wait for the stream to finish
query.awaitTermination()


24/03/29 14:48:58 WARN Utils: Your hostname, Priyas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.68.59 instead (on interface en0)
24/03/29 14:48:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/priyasuresh/.ivy2/cache
The jars for the packages stored in: /Users/priyasuresh/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e936e6a3-eeff-4b5d-bb5d-4857ec7ae283;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/priyasuresh/.pyenv/versions/3.10.4/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 198ms :: artifacts dl 5ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#h

symbol_id                      BITFINEX_SPOT_BTC_USDT
sequence                                        11111
time_exchange              2024-03-29 14:49:01.380000
time_coinapi               2024-03-29 14:49:01.399862
uuid             1cf365cd-3649-443e-b426-798a7dffb99d
price                                         69453.0
size                                           0.0004
taker_side                                       SELL
Name: 0, dtype: object
Row saved to: /Users/priyasuresh/googlecloud/batch_1_0.csv
File uploaded to GCS: gs://bigdataproject_cryptocurrency/batch_1_0.csv


                                                                                

symbol_id                       BITFINEX_SPOT_BTC_USD
sequence                                        44123
time_exchange              2024-03-29 14:49:04.711000
time_coinapi               2024-03-29 14:49:04.729673
uuid             a79b0eb9-fff2-476e-aaa8-64cf9dace6aa
price                                         69464.0
size                                            0.003
taker_side                                        BUY
Name: 0, dtype: object
Row saved to: /Users/priyasuresh/googlecloud/batch_2_0.csv
File uploaded to GCS: gs://bigdataproject_cryptocurrency/batch_2_0.csv
symbol_id                       BITFINEX_SPOT_BTC_USD
sequence                                        44125
time_exchange              2024-03-29 14:49:04.711000
time_coinapi               2024-03-29 14:49:04.729741
uuid             6a2cc531-894e-4cb4-a1ab-3b93919d5cf2
price                                         69464.0
size                                         0.000821
taker_side                           

                                                                                

symbol_id                         KRAKEN_SPOT_BTC_USD
sequence                                        26435
time_exchange              2024-03-29 14:49:15.610754
time_coinapi               2024-03-29 14:49:15.632702
uuid             20c47512-24ec-47da-ad81-0958c5bb16e2
price                                         69496.6
size                                           0.0009
taker_side                                        BUY
Name: 0, dtype: object
Row saved to: /Users/priyasuresh/googlecloud/batch_5_0.csv
File uploaded to GCS: gs://bigdataproject_cryptocurrency/batch_5_0.csv
symbol_id                         KRAKEN_SPOT_BTC_USD
sequence                                        26437
time_exchange              2024-03-29 14:49:15.610754
time_coinapi               2024-03-29 14:49:15.632702
uuid             19bbb3e9-e583-40b1-82e1-fa55b8e98918
price                                         69496.6
size                                          0.00261
taker_side                           

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/priyasuresh/.pyenv/versions/3.10.4/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=80>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/priyasuresh/.pyenv/versions/3.10.4/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/priyasuresh/.pyenv/versions/3.10.4/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/priyasuresh/.pyenv/versions/3.10.4/lib/python3.10/site-packages/py4j/clients

File uploaded to GCS: gs://bigdataproject_cryptocurrency/batch_25_9.csv
symbol_id                       BITSTAMP_SPOT_BTC_USD
sequence                                        18976
time_exchange              2024-03-29 14:50:34.893000
time_coinapi               2024-03-29 14:50:34.953812
uuid             30ea7fd5-99c3-4099-90a1-db87762859a9
price                                         69479.0
size                                         0.431782
taker_side                                       SELL
Name: 11, dtype: object
Row saved to: /Users/priyasuresh/googlecloud/batch_25_11.csv


Py4JError: An error occurred while calling o51.awaitTermination

File uploaded to GCS: gs://bigdataproject_cryptocurrency/batch_25_11.csv
symbol_id                       BITSTAMP_SPOT_BTC_USD
sequence                                        18978
time_exchange              2024-03-29 14:50:34.893000
time_coinapi               2024-03-29 14:50:34.953865
uuid             06ce6981-7f73-4833-9c4e-b0d38a1aad21
price                                         69478.0
size                                         0.006512
taker_side                                       SELL
Name: 13, dtype: object
Row saved to: /Users/priyasuresh/googlecloud/batch_25_13.csv
File uploaded to GCS: gs://bigdataproject_cryptocurrency/batch_25_13.csv
symbol_id                       BITSTAMP_SPOT_BTC_USD
sequence                                        18980
time_exchange              2024-03-29 14:50:34.893000
time_coinapi               2024-03-29 14:50:34.953893
uuid             9b9b847e-ff60-4ced-b6ad-8a9752954ab0
price                                         69473.0
size         

                                                                                

symbol_id                         KRAKEN_SPOT_BTC_USD
sequence                                        26487
time_exchange              2024-03-29 14:51:01.897355
time_coinapi               2024-03-29 14:51:01.917608
uuid             74895ba4-e1af-49f0-b09c-8061fafe71fe
price                                         69498.0
size                                         0.001402
taker_side                                        BUY
Name: 0, dtype: object
Row saved to: /Users/priyasuresh/googlecloud/batch_31_0.csv
File uploaded to GCS: gs://bigdataproject_cryptocurrency/batch_31_0.csv


In [2]:
query.stop()

In [3]:
spark.stop()