In [1]:
sc

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, LongType, TimestampType

In [3]:
spark = SparkSession.builder \
    .appName("KafkaStockStream") \
    .getOrCreate()

24/09/22 13:52:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
schema = StructType() \
    .add("ticker", StringType()) \
    .add("date", TimestampType()) \
    .add("open", DoubleType()) \
    .add("high", DoubleType()) \
    .add("low", DoubleType()) \
    .add("close", DoubleType()) \
    .add("volume", LongType())

In [5]:
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "stock_data_topic") \
    .load()


In [6]:
stock_df = df.selectExpr("CAST(value AS STRING) as json_data")

In [7]:
parsed_df = stock_df.select(from_json(col("json_data"), schema).alias("data")).select("data.*")


In [8]:
def write_to_cassandra(df, epoch_id):
    df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .option("keyspace", "stockmarket") \
        .option("table", "stocks") \
        .save()
    print('sent to cassandra')

In [9]:
# Define MongoDB connection details
from pymongo import MongoClient
uri = "mongodb+srv://ahmed:ZUp0kDbdhCp8QmPr@cluster0.mdxcz.mongodb.net/retryWrites=true&w=majority&appName=Cluster0"
database_name = "stock_market"
# collection_name = "faang_data"
def write_to_mongodb(df, epoch_id):
    # Convert Spark DataFrame to Pandas DataFrame
    pdf = df.toPandas()
    
    if not pdf.empty:    
        client = MongoClient("mongodb+srv://Ahmed:f74YXwZ8S09rUlg8@cluster0.mdxcz.mongodb.net/")
        db = client["stock_market"]
        collection = db["faang_data"]
        records = pdf.to_dict(orient='records')
        collection.insert_many(records)
        print("sent to mongo")
    else:
        print(f"Skipping write to MongoDB for epoch {epoch_id} due to empty DataFrame")

In [10]:
query = parsed_df.writeStream \
    .foreachBatch(lambda df, epoch_id: (
        write_to_mongodb(df, epoch_id),
        write_to_cassandra(df, epoch_id)
    )) \
    .start()

query.awaitTermination()

24/09/22 13:53:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-cefabef7-9235-4535-b471-4b03e0e84c36. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/22 13:53:42 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/09/22 13:53:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Skipping write to MongoDB for epoch 0 due to empty DataFrame


                                                                                

sent to cassandra


                                                                                

sent to mongo


                                                                                

sent to cassandra
sent to mongo
sent to cassandra
sent to mongo
sent to cassandra
sent to mongo
sent to cassandra
sent to mongo
sent to cassandra


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
query.stop()

In [11]:
df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="stocks", keyspace="stockmarket") \
    .load()

df.show()

24/09/22 13:57:26 WARN V2ScanPartitioningAndOrdering: Spark ignores the partitioning CassandraPartitioning. Please use KeyGroupedPartitioning for better performance


+------+-------------------+------------------+------------------+------------------+------------------+------+
|ticker|               date|             close|              high|               low|              open|volume|
+------+-------------------+------------------+------------------+------------------+------------------+------+
|  AAPL|2024-09-18 15:23:00| 220.1649932861328| 220.1649932861328| 220.1649932861328| 220.1649932861328|  NULL|
|  AAPL|2024-09-18 15:24:00| 220.5406036376953| 220.5406036376953| 220.5406036376953| 220.5406036376953|  NULL|
|  AAPL|2024-09-18 15:25:00| 221.1199951171875| 221.1199951171875| 221.1199951171875| 221.1199951171875|  NULL|
|  AAPL|2024-09-18 15:26:00| 221.4499969482422| 221.4499969482422| 221.4499969482422| 221.4499969482422|  NULL|
|  AAPL|2024-09-18 15:27:00|221.72999572753906|221.72999572753906|221.72999572753906|221.72999572753906|  NULL|
|  AAPL|2024-09-18 15:28:00|221.57000732421875|221.75999450683594|221.50999450683594| 221.6699981689453|

[Stage 14:>                                                         (0 + 1) / 1]                                                                                

sent to cassandra


[Stage 15:>                                                         (0 + 1) / 1]                                                                                

sent to mongo
sent to cassandra


In [None]:
query = parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()